浏览代码

Merge pull request #52 from emqx/jiyong

Add dataTemplate to rest sink
jinfahua 5 年之前
父节点
当前提交
dd84049b80
共有 5 个文件被更改,包括 609 次插入27 次删除
  1. 14 0
      common/templates/funcs.go
  2. 48 2
      docs/en_US/rules/sinks/rest.md
  3. 48 2
      docs/zh_CN/rules/sinks/rest.md
  4. 127 23
      xstream/sinks/rest_sink.go
  5. 372 0
      xstream/sinks/rest_sink_test.go

+ 14 - 0
common/templates/funcs.go

@@ -0,0 +1,14 @@
+package templates
+
+import (
+	"encoding/json"
+)
+
+//Use the name json in func map
+func JsonMarshal(v interface {}) (string, error) {
+	if a, err := json.Marshal(v); err != nil{
+		return "", err
+	}else{
+		return string(a), nil
+	}
+}

文件差异内容过多而无法显示
+ 48 - 2
docs/en_US/rules/sinks/rest.md


文件差异内容过多而无法显示
+ 48 - 2
docs/zh_CN/rules/sinks/rest.md


+ 127 - 23
xstream/sinks/rest_sink.go

@@ -4,11 +4,13 @@ import (
 	"bytes"
 	"encoding/json"
 	"fmt"
+	"github.com/emqx/kuiper/common/templates"
 	"github.com/emqx/kuiper/xstream/api"
 	"io/ioutil"
 	"net/http"
 	"net/url"
 	"strings"
+	"text/template"
 	"time"
 )
 
@@ -19,12 +21,14 @@ type RestSink struct {
 	bodyType    string
 	timeout		int64
 	sendSingle  bool
+	dataTemplate string
 
 	client      *http.Client
+	tp          *template.Template
 }
 
 var methodsMap = map[string]bool{"GET": true, "HEAD": true, "POST": true, "PUT": true, "DELETE": true, "PATCH": true}
-var bodyTypeMap = map[string]bool{"none":true, "raw": true, "form": true}
+var bodyTypeMap = map[string]string{"none":"", "text": "text/plain", "json":"application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": ""}
 
 func (ms *RestSink) Configure(ps map[string]interface{}) error {
 	temp, ok := ps["method"]
@@ -44,7 +48,7 @@ func (ms *RestSink) Configure(ps map[string]interface{}) error {
 	case "GET", "HEAD":
 		ms.bodyType = "none"
 	default:
-		ms.bodyType = "raw"
+		ms.bodyType = "json"
 	}
 
 	temp, ok = ps["url"]
@@ -98,13 +102,32 @@ func (ms *RestSink) Configure(ps map[string]interface{}) error {
 		}
 	}
 
+	temp, ok = ps["dataTemplate"]
+	if ok{
+		ms.dataTemplate, ok = temp.(string)
+		if !ok {
+			return fmt.Errorf("rest sink property dataTemplate %v is not a string", temp)
+		}
+	}
+
+	if ms.dataTemplate != ""{
+		funcMap := template.FuncMap{
+			"json": templates.JsonMarshal,
+		}
+		temp, err := template.New("restSink").Funcs(funcMap).Parse(ms.dataTemplate)
+		if err != nil{
+			return fmt.Errorf("rest sink property dataTemplate %v is invalid: %v", ms.dataTemplate, err)
+		}else{
+			ms.tp = temp
+		}
+	}
 	return nil
 }
 
 func (ms *RestSink) Open(ctx api.StreamContext) error {
 	logger := ctx.GetLogger()
 	ms.client = &http.Client{Timeout: time.Duration(ms.timeout) * time.Millisecond}
-	logger.Debugf("open rest sink with configuration: {method: %s, url: %s, bodyType: %s, timeout: %d,header: %v, sendSingle: %v", ms.method, ms.url, ms.bodyType, ms.timeout, ms.headers, ms.sendSingle)
+	logger.Debugf("open rest sink with configuration: {method: %s, url: %s, bodyType: %s, timeout: %d,header: %v, sendSingle: %v, dataTemplate: %s", ms.method, ms.url, ms.bodyType, ms.timeout, ms.headers, ms.sendSingle, ms.dataTemplate)
 	return nil
 }
 
@@ -118,18 +141,26 @@ func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 	if !ms.sendSingle{
 		return ms.send(v, logger)
 	}else{
-		var j []map[string]interface{}
-		if err := json.Unmarshal(v, &j); err != nil {
-			return fmt.Errorf("fail to decode the input %s as json: %v", v, err)
+		j, err := extractInput(v)
+		if err != nil {
+			return err
 		}
 		logger.Debugf("receive %d records", len(j))
-		for _, r := range j{
+		for _, r := range j {
 			ms.send(r, logger)
 		}
 	}
 	return nil
 }
 
+func extractInput(v []byte) ([]map[string]interface{}, error) {
+	var j []map[string]interface{}
+	if err := json.Unmarshal(v, &j); err != nil {
+		return nil, fmt.Errorf("fail to decode the input %s as json: %v", v, err)
+	}
+	return j, nil
+}
+
 func (ms *RestSink) send(v interface{}, logger api.Logger) error {
 	var req *http.Request
 	var err error
@@ -139,36 +170,63 @@ func (ms *RestSink) send(v interface{}, logger api.Logger) error {
 		if err != nil {
 			return fmt.Errorf("fail to create request: %v", err)
 		}
-	case "raw":
-		var content []byte
+	case "json", "text", "javascript", "html", "xml":
+		var body = &(bytes.Buffer{})
 		switch t := v.(type) {
 		case []byte:
-			content = t
+			if ms.tp != nil {
+				j, err := extractInput(t)
+				if err != nil {
+					return err
+				}
+				err = ms.tp.Execute(body, j)
+				if err != nil{
+					return fmt.Errorf("fail to decode content: %v", err)
+				}
+			}else{
+				body = bytes.NewBuffer(t)
+			}
 		case map[string]interface{}:
-			content, err = json.Marshal(t)
-			if err != nil{
-				return fmt.Errorf("fail to encode content: %v", err)
+			if ms.tp != nil{
+				err = ms.tp.Execute(body, t)
+				if err != nil{
+					return fmt.Errorf("fail to decode content: %v", err)
+				}
+			}else{
+				content, err := json.Marshal(t)
+				if err != nil{
+					return fmt.Errorf("fail to decode content: %v", err)
+				}
+				body = bytes.NewBuffer(content)
 			}
 		default:
 			return fmt.Errorf("invalid content: %v", v)
 		}
-		body := bytes.NewBuffer(content)
+
 		req, err = http.NewRequest(ms.method, ms.url, body)
 		if err != nil {
 			return fmt.Errorf("fail to create request: %v", err)
 		}
-		req.Header.Set("Content-Type", "application/json")
+		req.Header.Set("Content-Type", bodyTypeMap[ms.bodyType])
 	case "form":
 		form := url.Values{}
-		switch t := v.(type) {
-		case []byte:
-			form.Set("result", string(t))
-		case map[string]interface{}:
-			for key, value := range t {
-				form.Set(key, fmt.Sprintf("%v", value))
+		im, err := convertToMap(v, ms.tp)
+		if err != nil {
+			return err
+		}
+		for key, value := range im {
+			var vstr string
+			switch value.(type) {
+			case []interface{}, map[string]interface{}:
+				if temp, err := json.Marshal(value); err != nil {
+					return fmt.Errorf("fail to parse fomr value: %v", err)
+				}else{
+					vstr = string(temp)
+				}
+			default:
+				vstr = fmt.Sprintf("%v", value)
 			}
-		default:
-			return fmt.Errorf("invalid content: %v", v)
+			form.Set(key, vstr)
 		}
 		body := ioutil.NopCloser(strings.NewReader(form.Encode()))
 		req, err = http.NewRequest(ms.method, ms.url, body)
@@ -195,6 +253,52 @@ func (ms *RestSink) send(v interface{}, logger api.Logger) error {
 	return nil
 }
 
+func convertToMap(v interface{}, tp *template.Template) (map[string]interface{}, error) {
+	switch t := v.(type) {
+	case []byte:
+		if tp != nil{
+			j, err := extractInput(t)
+			if err != nil {
+				return nil, err
+			}
+			var output bytes.Buffer
+			err = tp.Execute(&output, j)
+			if err != nil{
+				return nil, fmt.Errorf("fail to decode content: %v", err)
+			}
+			r := make(map[string]interface{})
+			if err := json.Unmarshal(output.Bytes(), &r); err != nil{
+				return nil, fmt.Errorf("fail to decode content: %v", err)
+			}else{
+				return r, nil
+			}
+		}else{
+			r := make(map[string]interface{})
+			r["result"] = string(t)
+			return r, nil
+		}
+	case map[string]interface{}:
+		if tp != nil{
+			var output bytes.Buffer
+			err := tp.Execute(&output, t)
+			if err != nil{
+				return nil, fmt.Errorf("fail to decode content: %v", err)
+			}
+			r := make(map[string]interface{})
+			if err := json.Unmarshal(output.Bytes(), &r); err != nil{
+				return nil, fmt.Errorf("fail to decode content: %v", err)
+			}else{
+				return r, nil
+			}
+		}else{
+			return t, nil
+		}
+	default:
+		return nil, fmt.Errorf("invalid content: %v", v)
+	}
+	return nil, fmt.Errorf("invalid content: %v", v)
+}
+
 func (ms *RestSink) Close(ctx api.StreamContext) error {
 	logger := ctx.GetLogger()
 	logger.Infof("Closing rest sink")

+ 372 - 0
xstream/sinks/rest_sink_test.go

@@ -0,0 +1,372 @@
+package sinks
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xstream/contexts"
+	"io/ioutil"
+	"net/http"
+	"net/http/httptest"
+	"reflect"
+	"testing"
+)
+
+type request struct{
+	Method string
+	Body   string
+	ContentType string
+}
+
+func TestRestSink_Apply(t *testing.T) {
+	var tests = []struct {
+		config  map[string]interface{}
+		data 	[]map[string]interface{}
+		result  []request
+	}{
+		{
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"sendSingle": true,
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "POST",
+				Body: `{"ab":"hello1"}`,
+				ContentType: "application/json",
+			},{
+				Method: "POST",
+				Body: `{"ab":"hello2"}`,
+				ContentType: "application/json",
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "POST",
+				Body: `[{"ab":"hello1"},{"ab":"hello2"}]`,
+				ContentType: "application/json",
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "get",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "GET",
+				ContentType: "",
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "put",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"bodyType": "text",
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "PUT",
+				ContentType: "text/plain",
+				Body: `[{"ab":"hello1"},{"ab":"hello2"}]`,
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"bodyType": "form",
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "POST",
+				ContentType: "application/x-www-form-urlencoded;param=value",
+				Body: `result=%5B%7B%22ab%22%3A%22hello1%22%7D%2C%7B%22ab%22%3A%22hello2%22%7D%5D`,
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"bodyType": "form",
+				"sendSingle": true,
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "POST",
+				ContentType: "application/x-www-form-urlencoded;param=value",
+				Body: `ab=hello1`,
+			},{
+				Method: "POST",
+				ContentType: "application/x-www-form-urlencoded;param=value",
+				Body: `ab=hello2`,
+			}},
+		}, {
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"bodyType": "json",
+				"sendSingle": true,
+				"timeout": float64(1000),
+			},
+			data: []map[string]interface{}{{
+				"ab": "hello1",
+			}, {
+				"ab": "hello2",
+			}},
+			result: []request{{
+				Method:      "POST",
+				Body:        `{"ab":"hello1"}`,
+				ContentType: "application/json",
+			}, {
+				Method:      "POST",
+				Body:        `{"ab":"hello2"}`,
+				ContentType: "application/json",
+			}},
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestRestSink_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+
+	var requests []request
+	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request){
+		body, err := ioutil.ReadAll(r.Body)
+		if err != nil {
+			fmt.Printf("Error reading body: %v", err)
+			http.Error(w, "can't read body", http.StatusBadRequest)
+			return
+		}
+
+		requests = append(requests, request{
+			Method: r.Method,
+			Body: string(body),
+			ContentType: r.Header.Get("Content-Type"),
+		})
+		contextLogger.Debugf(string(body))
+		fmt.Fprintf(w, string(body))
+	}))
+	defer ts.Close()
+	for i, tt := range tests {
+		requests = nil
+		s := &RestSink{}
+		tt.config["url"] = ts.URL
+		s.Configure(tt.config)
+		s.Open(ctx)
+		input, err := json.Marshal(tt.data)
+		if err != nil{
+			t.Errorf("Failed to parse the input into []byte]")
+			continue
+		}
+		s.Collect(ctx, input)
+		s.Close(ctx)
+		if !reflect.DeepEqual(tt.result, requests) {
+			t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, requests)
+		}
+	}
+}
+
+func TestRestSinkTemplate_Apply(t *testing.T) {
+	var tests = []struct {
+		config  map[string]interface{}
+		data 	[]map[string]interface{}
+		result  []request
+	}{
+		{
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"sendSingle": true,
+				"dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "POST",
+				Body: `{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`,
+				ContentType: "application/json",
+			},{
+				Method: "POST",
+				Body: `{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`,
+				ContentType: "application/json",
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "POST",
+				Body: `{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`,
+				ContentType: "application/json",
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "get",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "GET",
+				ContentType: "",
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "put",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"bodyType": "html",
+				"dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "PUT",
+				ContentType: "text/html",
+				Body: `<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`,
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"bodyType": "form",
+				"dataTemplate": `{"content":{{json .}}}`,
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "POST",
+				ContentType: "application/x-www-form-urlencoded;param=value",
+				Body: `content=%5B%7B%22ab%22%3A%22hello1%22%7D%2C%7B%22ab%22%3A%22hello2%22%7D%5D`,
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"bodyType": "form",
+				"sendSingle": true,
+				"dataTemplate": `{"newab":"{{.ab}}"}`,
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "POST",
+				ContentType: "application/x-www-form-urlencoded;param=value",
+				Body: `newab=hello1`,
+			},{
+				Method: "POST",
+				ContentType: "application/x-www-form-urlencoded;param=value",
+				Body: `newab=hello2`,
+			}},
+		}, {
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"bodyType": "json",
+				"sendSingle": true,
+				"timeout": float64(1000),
+				"dataTemplate": `{"newab":"{{.ab}}"}`,
+			},
+			data: []map[string]interface{}{{
+				"ab": "hello1",
+			}, {
+				"ab": "hello2",
+			}},
+			result: []request{{
+				Method:      "POST",
+				Body:        `{"newab":"hello1"}`,
+				ContentType: "application/json",
+			}, {
+				Method:      "POST",
+				Body:        `{"newab":"hello2"}`,
+				ContentType: "application/json",
+			}},
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestRestSink_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+
+	var requests []request
+	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request){
+		body, err := ioutil.ReadAll(r.Body)
+		if err != nil {
+			fmt.Printf("Error reading body: %v", err)
+			http.Error(w, "can't read body", http.StatusBadRequest)
+			return
+		}
+
+		requests = append(requests, request{
+			Method: r.Method,
+			Body: string(body),
+			ContentType: r.Header.Get("Content-Type"),
+		})
+		contextLogger.Debugf(string(body))
+		fmt.Fprintf(w, string(body))
+	}))
+	defer ts.Close()
+	for i, tt := range tests {
+		requests = nil
+		s := &RestSink{}
+		tt.config["url"] = ts.URL
+		s.Configure(tt.config)
+		s.Open(ctx)
+		input, err := json.Marshal(tt.data)
+		if err != nil{
+			t.Errorf("Failed to parse the input into []byte]")
+			continue
+		}
+		s.Collect(ctx, input)
+		s.Close(ctx)
+		if !reflect.DeepEqual(tt.result, requests) {
+			t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, requests)
+		}
+	}
+}