Browse Source

feat(sink): rest sink add data template support

ngjaying 5 years atrás
parent
commit
f6e7bb0f31
2 changed files with 141 additions and 23 deletions
  1. 14 0
      common/templates/funcs.go
  2. 127 23
      xstream/sinks/rest_sink.go

+ 14 - 0
common/templates/funcs.go

@@ -0,0 +1,14 @@
+package templates
+
+import (
+	"encoding/json"
+)
+
+//Use the name json in func map
+func JsonMarshal(v interface {}) (string, error) {
+	if a, err := json.Marshal(v); err != nil{
+		return "", err
+	}else{
+		return string(a), nil
+	}
+}

+ 127 - 23
xstream/sinks/rest_sink.go

@@ -4,11 +4,13 @@ import (
 	"bytes"
 	"encoding/json"
 	"fmt"
+	"github.com/emqx/kuiper/common/templates"
 	"github.com/emqx/kuiper/xstream/api"
 	"io/ioutil"
 	"net/http"
 	"net/url"
 	"strings"
+	"text/template"
 	"time"
 )
 
@@ -19,12 +21,14 @@ type RestSink struct {
 	bodyType    string
 	timeout		int64
 	sendSingle  bool
+	dataTemplate string
 
 	client      *http.Client
+	tp          *template.Template
 }
 
 var methodsMap = map[string]bool{"GET": true, "HEAD": true, "POST": true, "PUT": true, "DELETE": true, "PATCH": true}
-var bodyTypeMap = map[string]bool{"none":true, "raw": true, "form": true}
+var bodyTypeMap = map[string]string{"none":"", "text": "text/plain", "json":"application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": ""}
 
 func (ms *RestSink) Configure(ps map[string]interface{}) error {
 	temp, ok := ps["method"]
@@ -44,7 +48,7 @@ func (ms *RestSink) Configure(ps map[string]interface{}) error {
 	case "GET", "HEAD":
 		ms.bodyType = "none"
 	default:
-		ms.bodyType = "raw"
+		ms.bodyType = "json"
 	}
 
 	temp, ok = ps["url"]
@@ -98,13 +102,32 @@ func (ms *RestSink) Configure(ps map[string]interface{}) error {
 		}
 	}
 
+	temp, ok = ps["dataTemplate"]
+	if ok{
+		ms.dataTemplate, ok = temp.(string)
+		if !ok {
+			return fmt.Errorf("rest sink property dataTemplate %v is not a string", temp)
+		}
+	}
+
+	if ms.dataTemplate != ""{
+		funcMap := template.FuncMap{
+			"json": templates.JsonMarshal,
+		}
+		temp, err := template.New("restSink").Funcs(funcMap).Parse(ms.dataTemplate)
+		if err != nil{
+			return fmt.Errorf("rest sink property dataTemplate %v is invalid: %v", ms.dataTemplate, err)
+		}else{
+			ms.tp = temp
+		}
+	}
 	return nil
 }
 
 func (ms *RestSink) Open(ctx api.StreamContext) error {
 	logger := ctx.GetLogger()
 	ms.client = &http.Client{Timeout: time.Duration(ms.timeout) * time.Millisecond}
-	logger.Debugf("open rest sink with configuration: {method: %s, url: %s, bodyType: %s, timeout: %d,header: %v, sendSingle: %v", ms.method, ms.url, ms.bodyType, ms.timeout, ms.headers, ms.sendSingle)
+	logger.Debugf("open rest sink with configuration: {method: %s, url: %s, bodyType: %s, timeout: %d,header: %v, sendSingle: %v, dataTemplate: %s", ms.method, ms.url, ms.bodyType, ms.timeout, ms.headers, ms.sendSingle, ms.dataTemplate)
 	return nil
 }
 
@@ -118,18 +141,26 @@ func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 	if !ms.sendSingle{
 		return ms.send(v, logger)
 	}else{
-		var j []map[string]interface{}
-		if err := json.Unmarshal(v, &j); err != nil {
-			return fmt.Errorf("fail to decode the input %s as json: %v", v, err)
+		j, err := extractInput(v)
+		if err != nil {
+			return err
 		}
 		logger.Debugf("receive %d records", len(j))
-		for _, r := range j{
+		for _, r := range j {
 			ms.send(r, logger)
 		}
 	}
 	return nil
 }
 
+func extractInput(v []byte) ([]map[string]interface{}, error) {
+	var j []map[string]interface{}
+	if err := json.Unmarshal(v, &j); err != nil {
+		return nil, fmt.Errorf("fail to decode the input %s as json: %v", v, err)
+	}
+	return j, nil
+}
+
 func (ms *RestSink) send(v interface{}, logger api.Logger) error {
 	var req *http.Request
 	var err error
@@ -139,36 +170,63 @@ func (ms *RestSink) send(v interface{}, logger api.Logger) error {
 		if err != nil {
 			return fmt.Errorf("fail to create request: %v", err)
 		}
-	case "raw":
-		var content []byte
+	case "json", "text", "javascript", "html", "xml":
+		var body = &(bytes.Buffer{})
 		switch t := v.(type) {
 		case []byte:
-			content = t
+			if ms.tp != nil {
+				j, err := extractInput(t)
+				if err != nil {
+					return err
+				}
+				err = ms.tp.Execute(body, j)
+				if err != nil{
+					return fmt.Errorf("fail to decode content: %v", err)
+				}
+			}else{
+				body = bytes.NewBuffer(t)
+			}
 		case map[string]interface{}:
-			content, err = json.Marshal(t)
-			if err != nil{
-				return fmt.Errorf("fail to encode content: %v", err)
+			if ms.tp != nil{
+				err = ms.tp.Execute(body, t)
+				if err != nil{
+					return fmt.Errorf("fail to decode content: %v", err)
+				}
+			}else{
+				content, err := json.Marshal(t)
+				if err != nil{
+					return fmt.Errorf("fail to decode content: %v", err)
+				}
+				body = bytes.NewBuffer(content)
 			}
 		default:
 			return fmt.Errorf("invalid content: %v", v)
 		}
-		body := bytes.NewBuffer(content)
+
 		req, err = http.NewRequest(ms.method, ms.url, body)
 		if err != nil {
 			return fmt.Errorf("fail to create request: %v", err)
 		}
-		req.Header.Set("Content-Type", "application/json")
+		req.Header.Set("Content-Type", bodyTypeMap[ms.bodyType])
 	case "form":
 		form := url.Values{}
-		switch t := v.(type) {
-		case []byte:
-			form.Set("result", string(t))
-		case map[string]interface{}:
-			for key, value := range t {
-				form.Set(key, fmt.Sprintf("%v", value))
+		im, err := convertToMap(v, ms.tp)
+		if err != nil {
+			return err
+		}
+		for key, value := range im {
+			var vstr string
+			switch value.(type) {
+			case []interface{}, map[string]interface{}:
+				if temp, err := json.Marshal(value); err != nil {
+					return fmt.Errorf("fail to parse fomr value: %v", err)
+				}else{
+					vstr = string(temp)
+				}
+			default:
+				vstr = fmt.Sprintf("%v", value)
 			}
-		default:
-			return fmt.Errorf("invalid content: %v", v)
+			form.Set(key, vstr)
 		}
 		body := ioutil.NopCloser(strings.NewReader(form.Encode()))
 		req, err = http.NewRequest(ms.method, ms.url, body)
@@ -195,6 +253,52 @@ func (ms *RestSink) send(v interface{}, logger api.Logger) error {
 	return nil
 }
 
+func convertToMap(v interface{}, tp *template.Template) (map[string]interface{}, error) {
+	switch t := v.(type) {
+	case []byte:
+		if tp != nil{
+			j, err := extractInput(t)
+			if err != nil {
+				return nil, err
+			}
+			var output bytes.Buffer
+			err = tp.Execute(&output, j)
+			if err != nil{
+				return nil, fmt.Errorf("fail to decode content: %v", err)
+			}
+			r := make(map[string]interface{})
+			if err := json.Unmarshal(output.Bytes(), &r); err != nil{
+				return nil, fmt.Errorf("fail to decode content: %v", err)
+			}else{
+				return r, nil
+			}
+		}else{
+			r := make(map[string]interface{})
+			r["result"] = string(t)
+			return r, nil
+		}
+	case map[string]interface{}:
+		if tp != nil{
+			var output bytes.Buffer
+			err := tp.Execute(&output, t)
+			if err != nil{
+				return nil, fmt.Errorf("fail to decode content: %v", err)
+			}
+			r := make(map[string]interface{})
+			if err := json.Unmarshal(output.Bytes(), &r); err != nil{
+				return nil, fmt.Errorf("fail to decode content: %v", err)
+			}else{
+				return r, nil
+			}
+		}else{
+			return t, nil
+		}
+	default:
+		return nil, fmt.Errorf("invalid content: %v", v)
+	}
+	return nil, fmt.Errorf("invalid content: %v", v)
+}
+
 func (ms *RestSink) Close(ctx api.StreamContext) error {
 	logger := ctx.GetLogger()
 	logger.Infof("Closing rest sink")