rest_sink.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package sinks
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "io/ioutil"
  8. "net/http"
  9. "net/url"
  10. "strings"
  11. "time"
  12. )
  13. type RestSink struct {
  14. method string
  15. url string
  16. headers map[string]string
  17. bodyType string
  18. timeout int64
  19. sendSingle bool
  20. client *http.Client
  21. }
  22. var methodsMap = map[string]bool{"GET": true, "HEAD": true, "POST": true, "PUT": true, "DELETE": true, "PATCH": true}
  23. var bodyTypeMap = map[string]bool{"none":true, "raw": true, "form": true}
  24. func (ms *RestSink) Configure(ps map[string]interface{}) error {
  25. temp, ok := ps["method"]
  26. if ok {
  27. ms.method, ok = temp.(string)
  28. if !ok {
  29. return fmt.Errorf("rest sink property method %v is not a string", temp)
  30. }
  31. ms.method = strings.ToUpper(strings.Trim(ms.method, ""))
  32. }else{
  33. ms.method = "GET"
  34. }
  35. if _, ok = methodsMap[ms.method]; !ok {
  36. return fmt.Errorf("invalid property method: %s", ms.method)
  37. }
  38. switch ms.method{
  39. case "GET", "HEAD":
  40. ms.bodyType = "none"
  41. default:
  42. ms.bodyType = "raw"
  43. }
  44. temp, ok = ps["url"]
  45. if !ok {
  46. return fmt.Errorf("rest sink is missing property url")
  47. }
  48. ms.url, ok = temp.(string)
  49. if !ok {
  50. return fmt.Errorf("rest sink property url %v is not a string", temp)
  51. }
  52. ms.url = strings.ToLower(strings.Trim(ms.url, ""))
  53. temp, ok = ps["headers"]
  54. if ok{
  55. ms.headers, ok = temp.(map[string]string)
  56. if !ok {
  57. return fmt.Errorf("rest sink property headers %v is not a map[string][]string", temp)
  58. }
  59. }
  60. temp, ok = ps["bodyType"]
  61. if ok{
  62. ms.bodyType, ok = temp.(string)
  63. if !ok {
  64. return fmt.Errorf("rest sink property bodyType %v is not a string", temp)
  65. }
  66. ms.bodyType = strings.ToLower(strings.Trim(ms.bodyType, ""))
  67. }
  68. if _, ok = bodyTypeMap[ms.bodyType]; !ok {
  69. return fmt.Errorf("invalid property bodyType: %s, should be \"none\" or \"form\"", ms.bodyType)
  70. }
  71. temp, ok = ps["timeout"]
  72. if !ok {
  73. ms.timeout = 5000
  74. }else{
  75. to, ok := temp.(float64)
  76. if !ok {
  77. return fmt.Errorf("rest sink property timeout %v is not a number", temp)
  78. }
  79. ms.timeout = int64(to)
  80. }
  81. temp, ok = ps["sendSingle"]
  82. if !ok{
  83. ms.sendSingle = false
  84. }else{
  85. ms.sendSingle, ok = temp.(bool)
  86. if !ok {
  87. return fmt.Errorf("rest sink property sendSingle %v is not a bool", temp)
  88. }
  89. }
  90. return nil
  91. }
  92. func (ms *RestSink) Open(ctx api.StreamContext) error {
  93. logger := ctx.GetLogger()
  94. ms.client = &http.Client{Timeout: time.Duration(ms.timeout) * time.Millisecond}
  95. logger.Debugf("open rest sink with configuration: {method: %s, url: %s, bodyType: %s, timeout: %d,header: %v, sendSingle: %v", ms.method, ms.url, ms.bodyType, ms.timeout, ms.headers, ms.sendSingle)
  96. return nil
  97. }
  98. func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
  99. logger := ctx.GetLogger()
  100. v, ok := item.([]byte)
  101. if !ok {
  102. logger.Warnf("rest sink receive non []byte data: %v", item)
  103. }
  104. logger.Debugf("rest sink receive %s", item)
  105. if !ms.sendSingle{
  106. return ms.send(v, logger)
  107. }else{
  108. var j []map[string]interface{}
  109. if err := json.Unmarshal(v, &j); err != nil {
  110. return fmt.Errorf("fail to decode the input %s as json: %v", v, err)
  111. }
  112. logger.Debugf("receive %d records", len(j))
  113. for _, r := range j{
  114. ms.send(r, logger)
  115. }
  116. }
  117. return nil
  118. }
  119. func (ms *RestSink) send(v interface{}, logger api.Logger) error {
  120. var req *http.Request
  121. var err error
  122. switch ms.bodyType {
  123. case "none":
  124. req, err = http.NewRequest(ms.method, ms.url, nil)
  125. if err != nil {
  126. return fmt.Errorf("fail to create request: %v", err)
  127. }
  128. case "raw":
  129. var content []byte
  130. switch t := v.(type) {
  131. case []byte:
  132. content = t
  133. case map[string]interface{}:
  134. content, err = json.Marshal(t)
  135. if err != nil{
  136. return fmt.Errorf("fail to encode content: %v", err)
  137. }
  138. default:
  139. return fmt.Errorf("invalid content: %v", v)
  140. }
  141. body := bytes.NewBuffer(content)
  142. req, err = http.NewRequest(ms.method, ms.url, body)
  143. if err != nil {
  144. return fmt.Errorf("fail to create request: %v", err)
  145. }
  146. req.Header.Set("Content-Type", "application/json")
  147. case "form":
  148. form := url.Values{}
  149. switch t := v.(type) {
  150. case []byte:
  151. form.Set("result", string(t))
  152. case map[string]interface{}:
  153. for key, value := range t {
  154. form.Set(key, fmt.Sprintf("%v", value))
  155. }
  156. default:
  157. return fmt.Errorf("invalid content: %v", v)
  158. }
  159. body := ioutil.NopCloser(strings.NewReader(form.Encode()))
  160. req, err = http.NewRequest(ms.method, ms.url, body)
  161. if err != nil {
  162. return fmt.Errorf("fail to create request: %v", err)
  163. }
  164. req.Header.Set("Content-Type", "application/x-www-form-urlencoded;param=value")
  165. default:
  166. return fmt.Errorf("unsupported body type %s", ms.bodyType)
  167. }
  168. if len(ms.headers) > 0 {
  169. for k, v := range ms.headers {
  170. req.Header.Set(k, v)
  171. }
  172. }
  173. logger.Debugf("do request: %s %s with %s", ms.method, ms.url, req.Body)
  174. resp, err := ms.client.Do(req)
  175. if err != nil {
  176. return fmt.Errorf("rest sink fails to send out the data")
  177. } else {
  178. logger.Debugf("rest sink got response %v", resp)
  179. }
  180. return nil
  181. }
  182. func (ms *RestSink) Close(ctx api.StreamContext) error {
  183. logger := ctx.GetLogger()
  184. logger.Infof("Closing rest sink")
  185. return nil
  186. }