Просмотр исходного кода

Merge pull request #282 from emqx/field_lowercase

Add support for http pull source
ngjaying 4 лет назад
Родитель
Сommit
970ec77d63

+ 107 - 0
common/http_util.go

@@ -0,0 +1,107 @@
+package common
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"github.com/emqx/kuiper/xstream/api"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"strings"
+)
+
+var BodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": ""}
+func Send(logger api.Logger, client *http.Client, bodyType string, method string, u string, headers map[string]string, sendSingle bool, v interface{}) ([]byte, error){
+	var req *http.Request
+	var err error
+	switch bodyType {
+	case "none":
+		req, err = http.NewRequest(method, u, nil)
+		if err != nil {
+			return nil, fmt.Errorf("fail to create request: %v", err)
+		}
+	case "json", "text", "javascript", "html", "xml":
+		var body = &(bytes.Buffer{})
+		switch t := v.(type) {
+		case []byte:
+			body = bytes.NewBuffer(t)
+		default:
+			return nil, fmt.Errorf("invalid content: %v", v)
+		}
+		req, err = http.NewRequest(method, u, body)
+		if err != nil {
+			return nil, fmt.Errorf("fail to create request: %v", err)
+		}
+		req.Header.Set("Content-Type", BodyTypeMap[bodyType])
+	case "form":
+		form := url.Values{}
+		im, err := convertToMap(v, sendSingle)
+		if err != nil {
+			return nil, err
+		}
+		for key, value := range im {
+			var vstr string
+			switch value.(type) {
+			case []interface{}, map[string]interface{}:
+				if temp, err := json.Marshal(value); err != nil {
+					return nil, fmt.Errorf("fail to parse fomr value: %v", err)
+				} else {
+					vstr = string(temp)
+				}
+			default:
+				vstr = fmt.Sprintf("%v", value)
+			}
+			form.Set(key, vstr)
+		}
+		body := ioutil.NopCloser(strings.NewReader(form.Encode()))
+		req, err = http.NewRequest(method, u, body)
+		if err != nil {
+			return nil, fmt.Errorf("fail to create request: %v", err)
+		}
+		req.Header.Set("Content-Type", "application/x-www-form-urlencoded;param=value")
+	default:
+		return nil, fmt.Errorf("unsupported body type %s", bodyType)
+	}
+
+	if len(headers) > 0 {
+		for k, v := range headers {
+			req.Header.Set(k, v)
+		}
+	}
+	logger.Debugf("do request: %s %s with %s", method, u, req.Body)
+	resp, err := client.Do(req)
+	if err != nil {
+		return nil, fmt.Errorf("rest sink fails to send out the data")
+	} else {
+		Log.Debugf("rest sink got response %v", resp)
+		if resp.StatusCode < 200 || resp.StatusCode > 299 {
+			return nil, fmt.Errorf("rest sink fails to err http return code: %d.", resp.StatusCode)
+		}
+		defer resp.Body.Close()
+		if body, err := ioutil.ReadAll(resp.Body); err != nil {
+			return nil, fmt.Errorf("rest sink fails to err response content: %s.", err)
+		} else {
+			return body, nil
+		}
+	}
+	return nil, nil
+}
+
+func convertToMap(v interface{}, sendSingle bool) (map[string]interface{}, error) {
+	switch t := v.(type) {
+	case []byte:
+		r := make(map[string]interface{})
+		if err := json.Unmarshal(t, &r); err != nil {
+			if sendSingle {
+				return nil, fmt.Errorf("fail to decode content: %v", err)
+			} else {
+				r["result"] = string(t)
+			}
+		}
+		return r, nil
+	default:
+		return nil, fmt.Errorf("invalid content: %v", v)
+	}
+	return nil, fmt.Errorf("invalid content: %v", v)
+}

+ 3 - 2
docs/en_US/rules/overview.md

@@ -39,15 +39,16 @@ The identification of the rule. The rule name cannot be duplicated in the same K
 
 The sql query to run for the rule. 
 
-- Kuiper provides embeded following 2 sources,
+- Kuiper provides embeded following 3 sources,
   - MQTT source, see  [MQTT source stream](sources/mqtt.md) for more detailed info.
   - EdgeX source by default is shipped in [docker images](https://hub.docker.com/r/emqx/kuiper), but NOT included in single download binary files, you use ``make pkg_with_edgex`` command to build a binary package that supports EdgeX source. Please see [EdgeX source stream](sources/edgex.md) for more detailed info.
+  - HTTP pull source, regularly pull the contents at user's specified interval time, see [here](sources/http_pull.md) for more detailed info. 
 - See [SQL](../sqls/overview.md) for more info of Kuiper SQL.
 - Sources can be customized, see [extension](../extension/overview.md) for more detailed info.
 
 ### sinks/actions
 
-Currently, 3 kinds of sinks/actions are supported:
+Currently, below kinds of sinks/actions are supported:
 
 - [log](sinks/logs.md): Send the result to log file.
 - [mqtt](sinks/mqtt.md): Send the result to an MQTT broker. 

+ 83 - 0
docs/en_US/rules/sources/http_pull.md

@@ -0,0 +1,83 @@
+# HTTP pull source 
+
+Kuiper provides built-in support for pulling HTTP source stream, which can pull the message from HTTP server broker and feed into the Kuiper processing pipeline.  The configuration file of HTTP pull source is at ``etc/sources/httppull.yaml``. Below is the file format.
+
+```yaml
+#Global httppull configurations
+default:
+  # url of the request server address
+  url: http://localhost
+  # post, get, put, delete
+  method: post
+  # The interval between the requests, time unit is ms
+  interval: 10000
+  # The timeout for http request, time unit is ms
+  timeout: 5000
+  # If it's set to true, then will compare with last result; If response of two requests are the same, then will skip sending out the result.
+  # The possible setting could be: true/false
+  incremental: false
+  # The body of request, such as '{"data": "data", "method": 1}'
+  body: '{}'
+  # Body type, none|text|json|html|xml|javascript|form
+  bodyType: json
+  # HTTP headers required for the request
+  headers:
+    Accept: application/json
+
+#Override the global configurations
+application_conf: #Conf_key
+  incremental: true
+  url: http://localhost:9090/pull
+```
+
+## Global HTTP pull configurations
+
+Use can specify the global HTTP pull settings here. The configuration items specified in ``default`` section will be taken as default settings for all HTTP connections. 
+
+### url
+
+The URL where to get the result.
+
+### method
+HTTP method, it could be post, get, put & delete.
+
+### interval
+
+The interval between the requests, time unit is ms.
+
+### timeout
+
+The timeout for http request, time unit is ms.
+
+### incremental
+
+If it's set to true, then will compare with last result; If response of two requests are the same, then will skip sending out the result.
+
+### body
+
+The body of request, such as `'{"data": "data", "method": 1}'`
+
+### bodyType
+
+Body type, it could be none|text|json|html|xml|javascript|form.
+
+### headers
+
+The HTTP request headers that you want to send along with the HTTP request.
+
+
+
+## Override the default settings
+
+If you have a specific connection that need to overwrite the default settings, you can create a customized section. In the previous sample, we create a specific setting named with ``application_conf``.  Then you can specify the configuration with option ``CONF_KEY`` when creating the stream definition (see [stream specs](../../sqls/streams.md) for more info).
+
+**Sample**
+
+```
+demo (
+		...
+	) WITH (DATASOURCE="test/", FORMAT="JSON", TYPE="httppull", KEY="USERID", CONF_KEY="application_conf");
+```
+
+The configuration keys used for these specific settings are the same as in ``default`` settings, any values specified in specific settings will overwrite the values in ``default`` section.
+

+ 4 - 1
docs/zh_CN/rules/overview.md

@@ -39,7 +39,10 @@
 
 为规则运行的sql查询。
 
-- Kuiper支持嵌入式MQTT源,有关更多详细信息,请参阅[MQTT source stream](sources/mqtt.md)。
+- Kuiper内置支持以下 3 种源
+    - MQTT 源,有关更多详细信息,请参阅[MQTT source stream](sources/mqtt.md)。
+    - EdgeX 源缺省是包含在[容器镜像](https://hub.docker.com/r/emqx/kuiper)中发布的,但是没有包含在单独下载的二进制包中,您可以使用 ``make pkg_with_edgex`` 命令来编译出一个支持 EdgeX 源的程序。更多关于它的详细信息,请参考 [EdgeX source stream](sources/edgex.md)。
+    - HTTP 定时拉取源,按照用户指定的时间间隔,定时从 HTTP 服务器中拉取数据,更多详细信息,请参考[这里](sources/http_pull.md) 。 
 - 有关Kuiper SQL的更多信息,请参阅[SQL](../sqls/overview.md)。
 - 可以自定义来源,请参阅 [extension](../extension/overview.md)了解更多详细信息。
 

+ 83 - 0
docs/zh_CN/rules/sources/http_pull.md

@@ -0,0 +1,83 @@
+# HTTP pull source 
+
+Kuiper provides built-in support for pulling HTTP source stream, which can pull the message from HTTP server broker and feed into the Kuiper processing pipeline.  The configuration file of HTTP pull source is at ``etc/sources/httppull.yaml``. Below is the file format.
+
+```yaml
+#Global httppull configurations
+default:
+  # url of the request server address
+  url: http://localhost
+  # post, get, put, delete
+  method: post
+  # The interval between the requests, time unit is ms
+  interval: 10000
+  # The timeout for http request, time unit is ms
+  timeout: 5000
+  # If it's set to true, then will compare with last result; If response of two requests are the same, then will skip sending out the result.
+  # The possible setting could be: true/false
+  incremental: false
+  # The body of request, such as '{"data": "data", "method": 1}'
+  body: '{}'
+  # Body type, none|text|json|html|xml|javascript|form
+  bodyType: json
+  # HTTP headers required for the request
+  headers:
+    Accept: application/json
+
+#Override the global configurations
+application_conf: #Conf_key
+  incremental: true
+  url: http://localhost:9090/pull
+```
+
+## Global HTTP pull configurations
+
+Use can specify the global HTTP pull settings here. The configuration items specified in ``default`` section will be taken as default settings for all HTTP connections. 
+
+### url
+
+The URL where to get the result.
+
+### method
+HTTP method, it could be post, get, put & delete.
+
+### interval
+
+The interval between the requests, time unit is ms.
+
+### timeout
+
+The timeout for http request, time unit is ms.
+
+### incremental
+
+If it's set to true, then will compare with last result; If response of two requests are the same, then will skip sending out the result.
+
+### body
+
+The body of request, such as `'{"data": "data", "method": 1}'`
+
+### bodyType
+
+Body type, it could be none|text|json|html|xml|javascript|form.
+
+### headers
+
+The HTTP request headers that you want to send along with the HTTP request.
+
+
+
+## Override the default settings
+
+If you have a specific connection that need to overwrite the default settings, you can create a customized section. In the previous sample, we create a specific setting named with ``application_conf``.  Then you can specify the configuration with option ``CONF_KEY`` when creating the stream definition (see [stream specs](../../sqls/streams.md) for more info).
+
+**Sample**
+
+```
+demo (
+		...
+	) WITH (DATASOURCE="test/", FORMAT="JSON", TYPE="httppull", KEY="USERID", CONF_KEY="application_conf");
+```
+
+The configuration keys used for these specific settings are the same as in ``default`` settings, any values specified in specific settings will overwrite the values in ``default`` section.
+

+ 25 - 0
etc/sources/httppull.yaml

@@ -0,0 +1,25 @@
+#Global httppull configurations
+default:
+  # url of the request server address
+  url: http://localhost
+  # post, get, put, delete
+  method: post
+  # The interval between the requests, time unit is ms
+  interval: 10000
+  # The timeout for http request, time unit is ms
+  timeout: 5000
+  # If it's set to true, then will compare with last result; If response of two requests are the same, then will skip sending out the result.
+  # The possible setting could be: true/false
+  incremental: false
+  # The body of request, such as '{"data": "data", "method": 1}'
+  body: '{}'
+  # Body type, none|text|json|html|xml|javascript|form
+  bodyType: json
+  # HTTP headers required for the request
+  headers:
+    Accept: application/json
+
+#Override the global configurations
+application_conf: #Conf_key
+  incremental: true
+  url: http://localhost:9090/pull

+ 10 - 1
fvt_scripts/README.md

@@ -193,4 +193,13 @@ For most of scripts, you can just start JMeter by default way, such as ``bin/jme
   # go build -o fvt_scripts/edgex/sub/sub fvt_scripts/edgex/sub/sub.go 
   ```
 
-  
+- [An end to end plugin test](plugin_end_2_end.jmx)
+  The script is an end-2-end plugin test. It requires a mock http server, and also a plugin.
+    ```shell
+    # go build -o fvt_scripts/plugins/service/http_server fvt_scripts/plugins/service/server.go 
+    ```
+
+- [Pull HTTP test](http_pull_rule.jmx)
+  
+  The test script verifies HTTP pull source. It sends request to a [server](fvt_scripts/plugins/service/server.go). The script set incremental to true, so it will compare with last result; If response of two requests are the same, then will skip sending out the result.
+  This script also requires to run [server](fvt_scripts/plugins/service/server.go), please refer to last testcase for how to compile and run.

+ 451 - 0
fvt_scripts/http_pull_rule.jmx

@@ -0,0 +1,451 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<jmeterTestPlan version="1.2" properties="4.0" jmeter="4.0 r1823414">
+  <hashTree>
+    <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="Test Plan" enabled="true">
+      <stringProp name="TestPlan.comments"></stringProp>
+      <boolProp name="TestPlan.functional_mode">false</boolProp>
+      <boolProp name="TestPlan.tearDown_on_shutdown">true</boolProp>
+      <boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
+      <elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+        <collectionProp name="Arguments.arguments"/>
+      </elementProp>
+      <stringProp name="TestPlan.user_define_classpath"></stringProp>
+    </TestPlan>
+    <hashTree>
+      <Arguments guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+        <collectionProp name="Arguments.arguments">
+          <elementProp name="srv" elementType="Argument">
+            <stringProp name="Argument.name">srv</stringProp>
+            <stringProp name="Argument.value">127.0.0.1</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+          <elementProp name="rest_port" elementType="Argument">
+            <stringProp name="Argument.name">rest_port</stringProp>
+            <stringProp name="Argument.value">9081</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+          <elementProp name="mqtt_srv" elementType="Argument">
+            <stringProp name="Argument.name">mqtt_srv</stringProp>
+            <stringProp name="Argument.value">127.0.0.1</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+        </collectionProp>
+      </Arguments>
+      <hashTree/>
+      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Rules" enabled="true">
+        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
+        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
+          <boolProp name="LoopController.continue_forever">false</boolProp>
+          <stringProp name="LoopController.loops">1</stringProp>
+        </elementProp>
+        <stringProp name="ThreadGroup.num_threads">1</stringProp>
+        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
+        <boolProp name="ThreadGroup.scheduler">false</boolProp>
+        <stringProp name="ThreadGroup.duration"></stringProp>
+        <stringProp name="ThreadGroup.delay"></stringProp>
+      </ThreadGroup>
+      <hashTree>
+        <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="View Results Tree" enabled="true">
+          <boolProp name="ResultCollector.error_logging">false</boolProp>
+          <objProp>
+            <name>saveConfig</name>
+            <value class="SampleSaveConfiguration">
+              <time>true</time>
+              <latency>true</latency>
+              <timestamp>true</timestamp>
+              <success>true</success>
+              <label>true</label>
+              <code>true</code>
+              <message>true</message>
+              <threadName>true</threadName>
+              <dataType>true</dataType>
+              <encoding>false</encoding>
+              <assertions>true</assertions>
+              <subresults>true</subresults>
+              <responseData>false</responseData>
+              <samplerData>false</samplerData>
+              <xml>false</xml>
+              <fieldNames>true</fieldNames>
+              <responseHeaders>false</responseHeaders>
+              <requestHeaders>false</requestHeaders>
+              <responseDataOnError>false</responseDataOnError>
+              <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
+              <assertionsResultsToSave>0</assertionsResultsToSave>
+              <bytes>true</bytes>
+              <sentBytes>true</sentBytes>
+              <threadCounts>true</threadCounts>
+              <idleTime>true</idleTime>
+              <connectTime>true</connectTime>
+            </value>
+          </objProp>
+          <stringProp name="filename"></stringProp>
+        </ResultCollector>
+        <hashTree/>
+        <TransactionController guiclass="TransactionControllerGui" testclass="TransactionController" testname="API" enabled="true">
+          <boolProp name="TransactionController.includeTimers">false</boolProp>
+          <boolProp name="TransactionController.parent">false</boolProp>
+        </TransactionController>
+        <hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_CreateStream" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value">{&#xd;
+&quot;sql&quot; : &quot;create stream demo (Temperature float, humidity bigint) WITH (FORMAT=\&quot;JSON\&quot;, TYPE=\&quot;httppull\&quot; DATASOURCE=\&quot;devices/+/messages\&quot; Conf_key=\&quot;application_conf\&quot;)&quot;&#xd;
+}</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/streams</stringProp>
+            <stringProp name="HTTPSampler.method">POST</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="-1754954177">Stream demo is created.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">true</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_CreateRule" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value">{&#xd;
+  &quot;id&quot;: &quot;rule1&quot;,&#xd;
+  &quot;sql&quot;: &quot;SELECT * FROM demo&quot;,&#xd;
+  &quot;actions&quot;: [&#xd;
+    {&#xd;
+      &quot;log&quot;: {}&#xd;
+    },&#xd;
+    {&#xd;
+      &quot;mqtt&quot;: {&#xd;
+        &quot;server&quot;: &quot;tcp://${mqtt_srv}:1883&quot;,&#xd;
+        &quot;topic&quot;: &quot;devices/result&quot;,&#xd;
+        &quot;qos&quot;: 1,&#xd;
+        &quot;clientId&quot;: &quot;demo_001&quot;&#xd;
+      }&#xd;
+    }&#xd;
+  ]&#xd;
+}</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules</stringProp>
+            <stringProp name="HTTPSampler.method">POST</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="-2022196798">Rule rule1 was created</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">true</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_GetRuleStatus" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1/status</stringProp>
+            <stringProp name="HTTPSampler.method">GET</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
+              <stringProp name="JSON_PATH">$.source_demo_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">0</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_GetRuleStatus" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1/status</stringProp>
+            <stringProp name="HTTPSampler.method">GET</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
+              <stringProp name="JSON_PATH">$.source_demo_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">1</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
+              <stringProp name="JSON_PATH">$.sink_mqtt_1_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">1</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+            <ConstantTimer guiclass="ConstantTimerGui" testclass="ConstantTimer" testname="固定定时器" enabled="true">
+              <stringProp name="ConstantTimer.delay">23000</stringProp>
+            </ConstantTimer>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_DropRule" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1</stringProp>
+            <stringProp name="HTTPSampler.method">DELETE</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="717250485">Rule rule1 is dropped.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">false</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_Drop_Stream" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/streams/demo</stringProp>
+            <stringProp name="HTTPSampler.method">DELETE</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="287881319">Stream demo is dropped.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">false</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <ConstantTimer guiclass="ConstantTimerGui" testclass="ConstantTimer" testname="Constant Timer" enabled="true">
+            <stringProp name="ConstantTimer.delay">500</stringProp>
+          </ConstantTimer>
+          <hashTree/>
+        </hashTree>
+      </hashTree>
+      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Result" enabled="true">
+        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
+        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
+          <boolProp name="LoopController.continue_forever">false</boolProp>
+          <stringProp name="LoopController.loops">1</stringProp>
+        </elementProp>
+        <stringProp name="ThreadGroup.num_threads">1</stringProp>
+        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
+        <boolProp name="ThreadGroup.scheduler">false</boolProp>
+        <stringProp name="ThreadGroup.duration"></stringProp>
+        <stringProp name="ThreadGroup.delay"></stringProp>
+      </ThreadGroup>
+      <hashTree>
+        <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="View Results Tree" enabled="true">
+          <boolProp name="ResultCollector.error_logging">false</boolProp>
+          <objProp>
+            <name>saveConfig</name>
+            <value class="SampleSaveConfiguration">
+              <time>true</time>
+              <latency>true</latency>
+              <timestamp>true</timestamp>
+              <success>true</success>
+              <label>true</label>
+              <code>true</code>
+              <message>true</message>
+              <threadName>true</threadName>
+              <dataType>true</dataType>
+              <encoding>false</encoding>
+              <assertions>true</assertions>
+              <subresults>true</subresults>
+              <responseData>false</responseData>
+              <samplerData>false</samplerData>
+              <xml>false</xml>
+              <fieldNames>true</fieldNames>
+              <responseHeaders>false</responseHeaders>
+              <requestHeaders>false</requestHeaders>
+              <responseDataOnError>false</responseDataOnError>
+              <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
+              <assertionsResultsToSave>0</assertionsResultsToSave>
+              <bytes>true</bytes>
+              <sentBytes>true</sentBytes>
+              <threadCounts>true</threadCounts>
+              <idleTime>true</idleTime>
+              <connectTime>true</connectTime>
+            </value>
+          </objProp>
+          <stringProp name="filename"></stringProp>
+        </ResultCollector>
+        <hashTree/>
+        <OnceOnlyController guiclass="OnceOnlyControllerGui" testclass="OnceOnlyController" testname="Once Only Controller" enabled="true"/>
+        <hashTree>
+          <net.xmeter.samplers.ConnectSampler guiclass="net.xmeter.gui.ConnectSamplerUI" testclass="net.xmeter.samplers.ConnectSampler" testname="MQTT Connect" enabled="true">
+            <stringProp name="mqtt.server">${mqtt_srv}</stringProp>
+            <stringProp name="mqtt.port">1883</stringProp>
+            <stringProp name="mqtt.version">3.1</stringProp>
+            <stringProp name="mqtt.conn_timeout">10</stringProp>
+            <boolProp name="mqtt.private_protocol">false</boolProp>
+            <stringProp name="mqtt.listener_timeout">10</stringProp>
+            <stringProp name="mqtt.protocol">TCP</stringProp>
+            <boolProp name="mqtt.dual_ssl_authentication">false</boolProp>
+            <stringProp name="mqtt.keystore_file_path"></stringProp>
+            <stringProp name="mqtt.keystore_password"></stringProp>
+            <stringProp name="mqtt.clientcert_file_path"></stringProp>
+            <stringProp name="mqtt.clientcert_password"></stringProp>
+            <stringProp name="mqtt.user_name"></stringProp>
+            <stringProp name="mqtt.password"></stringProp>
+            <stringProp name="mqtt.client_id_prefix">conn_</stringProp>
+            <boolProp name="mqtt.client_id_suffix">true</boolProp>
+            <stringProp name="mqtt.conn_keep_alive">300</stringProp>
+            <stringProp name="mqtt.conn_attampt_max">0</stringProp>
+            <stringProp name="mqtt.reconn_attampt_max">0</stringProp>
+          </net.xmeter.samplers.ConnectSampler>
+          <hashTree/>
+        </hashTree>
+        <net.xmeter.samplers.SubSampler guiclass="net.xmeter.gui.SubSamplerUI" testclass="net.xmeter.samplers.SubSampler" testname="AnalysisResult" enabled="true">
+          <stringProp name="mqtt.topic_name">devices/result</stringProp>
+          <stringProp name="mqtt.qos_level">0</stringProp>
+          <boolProp name="mqtt.add_timestamp">false</boolProp>
+          <boolProp name="mqtt.debug_response">true</boolProp>
+          <stringProp name="mqtt.sample_condition">specified elapsed time (ms)</stringProp>
+          <stringProp name="mqtt.sample_condition_value">25000</stringProp>
+        </net.xmeter.samplers.SubSampler>
+        <hashTree>
+          <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="temperature Assertion" enabled="true">
+            <stringProp name="JSON_PATH">$[0].Temperature</stringProp>
+            <stringProp name="EXPECTED_VALUE">${temperature}</stringProp>
+            <boolProp name="JSONVALIDATION">false</boolProp>
+            <boolProp name="EXPECT_NULL">false</boolProp>
+            <boolProp name="INVERT">false</boolProp>
+            <boolProp name="ISREGEX">false</boolProp>
+          </JSONPathAssertion>
+          <hashTree/>
+          <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="humidity Assertion" enabled="true">
+            <stringProp name="JSON_PATH">$[0].humidity</stringProp>
+            <stringProp name="EXPECTED_VALUE">${humidity}</stringProp>
+            <boolProp name="JSONVALIDATION">false</boolProp>
+            <boolProp name="EXPECT_NULL">false</boolProp>
+            <boolProp name="INVERT">false</boolProp>
+            <boolProp name="ISREGEX">false</boolProp>
+          </JSONPathAssertion>
+          <hashTree/>
+        </hashTree>
+      </hashTree>
+    </hashTree>
+  </hashTree>
+</jmeterTestPlan>

+ 41 - 0
fvt_scripts/plugins/service/server.go

@@ -2,9 +2,13 @@ package main
 
 import (
 	"bytes"
+	"encoding/json"
+	"fmt"
 	"io/ioutil"
 	"log"
+	"math/rand"
 	"net/http"
+	"time"
 )
 
 func alert(w http.ResponseWriter, req *http.Request) {
@@ -19,9 +23,46 @@ func alert(w http.ResponseWriter, req *http.Request) {
 	log.Printf("BODY: %q", rdr1)
 }
 
+var count = 0
+
+type Sensor struct {
+	Temperature int `json: "temperature""`
+	Humidity    int `json: "humidiy"`
+}
+
+var s = &Sensor{}
+func pullSrv(w http.ResponseWriter, req *http.Request) {
+	buf, bodyErr := ioutil.ReadAll(req.Body)
+	if bodyErr != nil {
+		log.Print("bodyErr ", bodyErr.Error())
+		http.Error(w, bodyErr.Error(), http.StatusInternalServerError)
+		return
+	} else {
+		fmt.Println(string(buf))
+	}
+
+	if count % 2 == 0 {
+		rand.Seed(time.Now().UnixNano())
+		s.Temperature = rand.Intn(100)
+		s.Humidity = rand.Intn(100)
+	}
+	fmt.Printf("%v\n", s)
+	count++
+	sd, err := json.Marshal(s)
+	if err != nil {
+		fmt.Println(err)
+		return
+	} else {
+		if _, e := fmt.Fprintf(w, "%s", sd); e != nil {
+			fmt.Println(e)
+		}
+	}
+}
+
 func main() {
 	http.Handle("/", http.FileServer(http.Dir("web")))
 	http.HandleFunc("/alert", alert)
+	http.HandleFunc("/pull", pullSrv)
 
 	http.ListenAndServe(":9090", nil)
 }

+ 3 - 0
fvt_scripts/run_jmeter.sh

@@ -87,3 +87,6 @@ fi
 
 /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/plugin_end_2_end.jmx -Dfvt="$fvt_dir" -l jmeter_logs/plugin_end_2_end.jtl -j jmeter_logs/plugin_end_2_end.log
 echo -e "---------------------------------------------\n"
+
+/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/http_pull_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/http_pull_rule.jtl -j jmeter_logs/http_pull_rule.log
+echo -e "---------------------------------------------\n"

+ 1 - 1
xstream/extensions/edgex_source.go

@@ -71,7 +71,7 @@ func (es *EdgexSource) Configure(device string, props map[string]interface{}) er
 		if ops1, ok1 := ops.(map[interface{}]interface{}); ok1 {
 			for k, v := range ops1 {
 				k1 := k.(string)
-				if cv, ok := castToString(v); ok {
+				if cv, ok := CastToString(v); ok {
 					optional[k1] = cv
 				} else {
 					common.Log.Infof("Cannot convert configuration %s: %s to string type.\n", k, v)

+ 4 - 4
xstream/extensions/edgex_source_test.go

@@ -180,16 +180,16 @@ func TestWrongValue(t *testing.T) {
 }
 
 func TestCastToString(t *testing.T) {
-	if v, ok := castToString(12); v != "12" || !ok {
+	if v, ok := CastToString(12); v != "12" || !ok {
 		t.Errorf("Failed to cast int.")
 	}
-	if v, ok := castToString(true); v != "true" || !ok {
+	if v, ok := CastToString(true); v != "true" || !ok {
 		t.Errorf("Failed to cast bool.")
 	}
-	if v, ok := castToString("hello"); v != "hello" || !ok {
+	if v, ok := CastToString("hello"); v != "hello" || !ok {
 		t.Errorf("Failed to cast string.")
 	}
-	if v, ok := castToString(12.3); v != "12.30" || !ok {
+	if v, ok := CastToString(12.3); v != "12.30" || !ok {
 		t.Errorf("Failed to cast float.")
 	}
 }

+ 178 - 0
xstream/extensions/httppull_source.go

@@ -0,0 +1,178 @@
+package extensions
+
+import (
+	"crypto/md5"
+	"encoding/hex"
+	"encoding/json"
+	"fmt"
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xstream/api"
+	"net/http"
+	"net/url"
+	"strings"
+	"time"
+)
+
+const DEFAULT_INTERVAL = 10000
+const DEFAULT_TIMEOUT = 5000
+
+type HTTPPullSource struct {
+	url         string
+	method      string
+	interval    int
+	timeout     int
+	incremental bool
+	body        string
+	bodyType    string
+	headers     map[string]string
+
+	client *http.Client
+}
+
+var bodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": ""}
+
+func (hps *HTTPPullSource) Configure(device string, props map[string]interface{}) error {
+	hps.url = "http://localhost";
+	if u, ok := props["url"]; ok {
+		if p, ok := u.(string); ok {
+			hps.url = p
+		}
+	}
+
+	hps.method = http.MethodGet
+	if m, ok := props["method"]; ok {
+		if m1, ok1 := m.(string); ok1 {
+			switch strings.ToUpper(m1) {
+			case http.MethodGet, http.MethodPost, http.MethodPut, http.MethodDelete:
+				hps.method = m1
+			default:
+				return fmt.Errorf("Not supported HTTP method %s.", m1)
+			}
+		}
+	}
+
+	hps.interval = DEFAULT_INTERVAL
+	if i, ok := props["interval"]; ok {
+		if i1, ok1 := i.(int); ok1 {
+			hps.interval = i1
+		} else {
+			return fmt.Errorf("Not valid interval value %v.", i1)
+		}
+	}
+
+	hps.timeout = DEFAULT_TIMEOUT
+	if i, ok := props["timeout"]; ok {
+		if i1, ok1 := i.(int); ok1 {
+			hps.timeout = i1
+		} else {
+			return fmt.Errorf("Not valid timeout value %v.", i1)
+		}
+	}
+
+	hps.incremental = false
+	if i, ok := props["incremental"]; ok {
+		if i1, ok1 := i.(bool); ok1 {
+			hps.incremental = i1
+		} else {
+			return fmt.Errorf("Not valid incremental value %v.", i1)
+		}
+	}
+
+	hps.bodyType = "json"
+	if c, ok := props["bodyType"]; ok {
+		if c1, ok1 := c.(string); ok1 {
+			if _, ok2 := bodyTypeMap[strings.ToLower(c1)]; ok2 {
+				hps.bodyType = strings.ToLower(c1)
+			} else {
+				return fmt.Errorf("Not valid body type value %v.", c)
+			}
+		} else {
+			return fmt.Errorf("Not valid body type value %v.", c)
+		}
+	}
+
+	if b, ok := props["body"]; ok {
+		if b1, ok1 := b.(string); ok1 {
+			hps.body = b1
+		} else {
+			return fmt.Errorf("Not valid incremental value %v, expect string.", b1)
+		}
+	}
+
+	hps.headers = make(map[string]string)
+	if h, ok := props["headers"]; ok {
+		if h1, ok1 := h.(map[string]interface{}); ok1 {
+			for k, v := range h1 {
+				if v1, ok2 := CastToString(v); ok2 {
+					hps.headers[k] = v1
+				}
+			}
+		} else {
+			return fmt.Errorf("Not valid header value %v.", h1)
+		}
+	}
+
+	common.Log.Infof("Initialized with configurations %#v.", hps)
+	return nil
+}
+
+func (hps *HTTPPullSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
+	_, e := url.Parse(hps.url)
+	if e != nil {
+		errCh <- e
+		return
+	}
+
+	hps.client = &http.Client{Timeout: time.Duration(hps.timeout) * time.Millisecond}
+	hps.initTimerPull(ctx, consumer, errCh)
+}
+
+func (hps *HTTPPullSource) Close(ctx api.StreamContext) error {
+	logger := ctx.GetLogger()
+	logger.Infof("Closing HTTP pull source")
+	return nil
+}
+
+func (hps *HTTPPullSource) initTimerPull(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
+	ticker := time.NewTicker(time.Millisecond * time.Duration(hps.interval))
+	logger := ctx.GetLogger()
+	defer ticker.Stop()
+	var omd5 string = ""
+	for {
+		select {
+		case <-ticker.C:
+			if c, e := common.Send(logger, hps.client, hps.bodyType, hps.method, hps.url, hps.headers, true, []byte(hps.body)); e != nil {
+				logger.Warnf("Found error %s when trying to reach %v ", e, hps)
+			} else {
+				if hps.incremental {
+					nmd5 := getMD5Hash(c)
+					if omd5 == nmd5 {
+						logger.Infof("Content has not changed since last fetch, so skip processing.")
+						continue
+					} else {
+						omd5 = nmd5
+					}
+				}
+
+				result := make(map[string]interface{})
+				meta := make(map[string]interface{})
+				if e := json.Unmarshal(c, &result); e != nil {
+					logger.Errorf("Invalid data format, cannot convert %s into JSON with error %s", string(c), e)
+					return
+				}
+
+				select {
+				case consumer <- api.NewDefaultSourceTuple(result, meta):
+					logger.Debugf("send data to device node")
+				}
+			}
+		case <-ctx.Done():
+			return
+		}
+	}
+}
+
+func getMD5Hash(text []byte) string {
+	hash := md5.Sum(text)
+	return hex.EncodeToString(hash[:])
+}

+ 21 - 0
xstream/extensions/source_util.go

@@ -0,0 +1,21 @@
+package extensions
+
+import (
+	"fmt"
+	"strconv"
+)
+
+func CastToString(v interface{}) (result string, ok bool) {
+	switch v := v.(type) {
+	case int:
+		return strconv.Itoa(v), true
+	case string:
+		return v, true
+	case bool:
+		return strconv.FormatBool(v), true
+	case float64, float32:
+		return fmt.Sprintf("%.2f", v), true
+	default:
+		return "", false
+	}
+}

+ 15 - 7
xstream/nodes/source_node.go

@@ -154,6 +154,8 @@ func doGetSource(t string) (api.Source, error) {
 	switch t {
 	case "mqtt":
 		s = &extensions.MQTTSource{}
+	case "httppull":
+		s = &extensions.HTTPPullSource{}
 	default:
 		s, err = plugins.GetSource(t)
 		if err != nil {
@@ -190,18 +192,24 @@ func (m *SourceNode) getConf(ctx api.StreamContext) map[string]interface{} {
 	conf, err := common.LoadConf(confPath)
 	props := make(map[string]interface{})
 	if err == nil {
-		cfg := make(map[string]map[string]interface{})
+		cfg := make(map[interface{}]interface{})
 		if err := yaml.Unmarshal(conf, &cfg); err != nil {
 			logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", m.sourceType)
 		} else {
-			var ok bool
-			props, ok = cfg["default"]
+			def, ok := cfg["default"]
 			if !ok {
 				logger.Warnf("default conf is not found", confkey)
-			}
-			if c, ok := cfg[confkey]; ok {
-				for k, v := range c {
-					props[k] = v
+			} else {
+				if def1, ok1 := def.(map[interface{}]interface{}); ok1 {
+					props = common.ConvertMap(def1)
+				}
+				if c, ok := cfg[confkey]; ok {
+					if c1, ok := c.(map[interface{}]interface{}); ok {
+						c2 := common.ConvertMap(c1)
+						for k, v := range c2 {
+							props[k] = v
+						}
+					}
 				}
 			}
 		}

+ 4 - 4
xstream/nodes/source_node_test.go

@@ -15,13 +15,13 @@ func TestGetConf_Apply(t *testing.T) {
 		"client":   "900",
 		"user":     "SPERF",
 		"passwd":   "PASSPASS",
-		"params": map[interface{}]interface{}{
+		"params": map[string]interface{}{
 			"QUERY_TABLE": "VBAP",
 			"ROWCOUNT":    10,
 			"FIELDS": []interface{}{
-				map[interface{}]interface{}{"FIELDNAME": "MANDT"},
-				map[interface{}]interface{}{"FIELDNAME": "VBELN"},
-				map[interface{}]interface{}{"FIELDNAME": "POSNR"},
+				map[string]interface{}{"FIELDNAME": "MANDT"},
+				map[string]interface{}{"FIELDNAME": "VBELN"},
+				map[string]interface{}{"FIELDNAME": "POSNR"},
 			},
 		},
 	}

+ 5 - 95
xstream/sinks/rest_sink.go

@@ -1,11 +1,9 @@
 package sinks
 
 import (
-	"bytes"
-	"encoding/json"
 	"fmt"
+	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xstream/api"
-	"io/ioutil"
 	"net"
 	"net/http"
 	"net/url"
@@ -25,7 +23,6 @@ type RestSink struct {
 }
 
 var methodsMap = map[string]bool{"GET": true, "HEAD": true, "POST": true, "PUT": true, "DELETE": true, "PATCH": true}
-var bodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": ""}
 
 func (ms *RestSink) Configure(ps map[string]interface{}) error {
 	temp, ok := ps["method"]
@@ -62,7 +59,7 @@ func (ms *RestSink) Configure(ps map[string]interface{}) error {
 	if ok {
 		ms.headers, ok = temp.(map[string]string)
 		if !ok {
-			return fmt.Errorf("rest sink property headers %v is not a map[string][]string", temp)
+			return fmt.Errorf("rest sink property headers %v is not a map[string]string", temp)
 		}
 	}
 
@@ -74,7 +71,7 @@ func (ms *RestSink) Configure(ps map[string]interface{}) error {
 		}
 		ms.bodyType = strings.ToLower(strings.Trim(ms.bodyType, ""))
 	}
-	if _, ok = bodyTypeMap[ms.bodyType]; !ok {
+	if _, ok = common.BodyTypeMap[ms.bodyType]; !ok {
 		return fmt.Errorf("invalid property bodyType: %s, should be \"none\" or \"form\"", ms.bodyType)
 	}
 
@@ -144,95 +141,8 @@ func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 		logger.Warnf("rest sink receive non []byte data: %v", item)
 	}
 	logger.Debugf("rest sink receive %s", item)
-	return ms.send(v, logger)
-}
-
-func (ms *RestSink) send(v interface{}, logger api.Logger) error {
-	var req *http.Request
-	var err error
-	switch ms.bodyType {
-	case "none":
-		req, err = http.NewRequest(ms.method, ms.url, nil)
-		if err != nil {
-			return fmt.Errorf("fail to create request: %v", err)
-		}
-	case "json", "text", "javascript", "html", "xml":
-		var body = &(bytes.Buffer{})
-		switch t := v.(type) {
-		case []byte:
-			body = bytes.NewBuffer(t)
-		default:
-			return fmt.Errorf("invalid content: %v", v)
-		}
-		req, err = http.NewRequest(ms.method, ms.url, body)
-		if err != nil {
-			return fmt.Errorf("fail to create request: %v", err)
-		}
-		req.Header.Set("Content-Type", bodyTypeMap[ms.bodyType])
-	case "form":
-		form := url.Values{}
-		im, err := convertToMap(v, ms.sendSingle)
-		if err != nil {
-			return err
-		}
-		for key, value := range im {
-			var vstr string
-			switch value.(type) {
-			case []interface{}, map[string]interface{}:
-				if temp, err := json.Marshal(value); err != nil {
-					return fmt.Errorf("fail to parse fomr value: %v", err)
-				} else {
-					vstr = string(temp)
-				}
-			default:
-				vstr = fmt.Sprintf("%v", value)
-			}
-			form.Set(key, vstr)
-		}
-		body := ioutil.NopCloser(strings.NewReader(form.Encode()))
-		req, err = http.NewRequest(ms.method, ms.url, body)
-		if err != nil {
-			return fmt.Errorf("fail to create request: %v", err)
-		}
-		req.Header.Set("Content-Type", "application/x-www-form-urlencoded;param=value")
-	default:
-		return fmt.Errorf("unsupported body type %s", ms.bodyType)
-	}
-
-	if len(ms.headers) > 0 {
-		for k, v := range ms.headers {
-			req.Header.Set(k, v)
-		}
-	}
-	logger.Debugf("do request: %s %s with %s", ms.method, ms.url, req.Body)
-	resp, err := ms.client.Do(req)
-	if err != nil {
-		return fmt.Errorf("rest sink fails to send out the data")
-	} else {
-		logger.Debugf("rest sink got response %v", resp)
-		if resp.StatusCode < 200 || resp.StatusCode > 299 {
-			return fmt.Errorf("rest sink fails to err http return code: %d.", resp.StatusCode)
-		}
-	}
-	return nil
-}
-
-func convertToMap(v interface{}, sendSingle bool) (map[string]interface{}, error) {
-	switch t := v.(type) {
-	case []byte:
-		r := make(map[string]interface{})
-		if err := json.Unmarshal(t, &r); err != nil {
-			if sendSingle {
-				return nil, fmt.Errorf("fail to decode content: %v", err)
-			} else {
-				r["result"] = string(t)
-			}
-		}
-		return r, nil
-	default:
-		return nil, fmt.Errorf("invalid content: %v", v)
-	}
-	return nil, fmt.Errorf("invalid content: %v", v)
+	_, e:= common.Send(logger, ms.client, ms.bodyType, ms.method, ms.url, ms.headers, ms.sendSingle, v)
+	return e
 }
 
 func (ms *RestSink) Close(ctx api.StreamContext) error {