rest_sink.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package sinks
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "io/ioutil"
  8. "net/http"
  9. "net/url"
  10. "strings"
  11. "time"
  12. )
  13. type RestSink struct {
  14. method string
  15. url string
  16. headers map[string]string
  17. bodyType string
  18. timeout int64
  19. sendSingle bool
  20. debugResp bool
  21. insecureSkipVerify bool
  22. client *http.Client
  23. }
  24. var methodsMap = map[string]bool{"GET": true, "HEAD": true, "POST": true, "PUT": true, "DELETE": true, "PATCH": true}
  25. func (ms *RestSink) Configure(ps map[string]interface{}) error {
  26. temp, ok := ps["method"]
  27. if ok {
  28. ms.method, ok = temp.(string)
  29. if !ok {
  30. return fmt.Errorf("rest sink property method %v is not a string", temp)
  31. }
  32. ms.method = strings.ToUpper(strings.Trim(ms.method, ""))
  33. } else {
  34. ms.method = "GET"
  35. }
  36. if _, ok = methodsMap[ms.method]; !ok {
  37. return fmt.Errorf("invalid property method: %s", ms.method)
  38. }
  39. switch ms.method {
  40. case "GET", "HEAD":
  41. ms.bodyType = "none"
  42. default:
  43. ms.bodyType = "json"
  44. }
  45. temp, ok = ps["url"]
  46. if !ok {
  47. return fmt.Errorf("rest sink is missing property url")
  48. }
  49. ms.url, ok = temp.(string)
  50. if !ok {
  51. return fmt.Errorf("rest sink property url %v is not a string", temp)
  52. }
  53. ms.url = strings.Trim(ms.url, "")
  54. temp, ok = ps["headers"]
  55. if ok {
  56. ms.headers = make(map[string]string)
  57. if m, ok := temp.(map[string]interface{}); ok {
  58. for k, v := range m {
  59. if v1, ok1 := v.(string); ok1 {
  60. ms.headers[k] = v1
  61. } else {
  62. return fmt.Errorf("header value %s for header %s is not a string", v, k)
  63. }
  64. }
  65. } else {
  66. return fmt.Errorf("rest sink property headers %v is not a map[string]interface", temp)
  67. }
  68. }
  69. temp, ok = ps["bodyType"]
  70. if ok {
  71. ms.bodyType, ok = temp.(string)
  72. if !ok {
  73. return fmt.Errorf("rest sink property bodyType %v is not a string", temp)
  74. }
  75. ms.bodyType = strings.ToLower(strings.Trim(ms.bodyType, ""))
  76. }
  77. if _, ok = common.BodyTypeMap[ms.bodyType]; !ok {
  78. return fmt.Errorf("invalid property bodyType: %s, should be \"none\" or \"form\"", ms.bodyType)
  79. }
  80. temp, ok = ps["timeout"]
  81. if !ok {
  82. ms.timeout = 5000
  83. } else {
  84. to, ok := temp.(float64)
  85. if !ok {
  86. return fmt.Errorf("rest sink property timeout %v is not a number", temp)
  87. }
  88. ms.timeout = int64(to)
  89. }
  90. temp, ok = ps["sendSingle"]
  91. if !ok {
  92. ms.sendSingle = false
  93. } else {
  94. ms.sendSingle, ok = temp.(bool)
  95. if !ok {
  96. return fmt.Errorf("rest sink property sendSingle %v is not a bool", temp)
  97. }
  98. }
  99. temp, ok = ps["debugResp"]
  100. if !ok {
  101. ms.debugResp = false
  102. } else {
  103. ms.debugResp, ok = temp.(bool)
  104. if !ok {
  105. return fmt.Errorf("rest sink property debugResp %v is not a bool", temp)
  106. }
  107. }
  108. temp, ok = ps["insecureSkipVerify"]
  109. if !ok {
  110. ms.insecureSkipVerify = true
  111. } else {
  112. ms.insecureSkipVerify, ok = temp.(bool)
  113. if !ok {
  114. return fmt.Errorf("rest sink property insecureSkipVerify %v is not a bool", temp)
  115. }
  116. }
  117. return nil
  118. }
  119. func (ms *RestSink) Open(ctx api.StreamContext) error {
  120. logger := ctx.GetLogger()
  121. tr := &http.Transport{
  122. TLSClientConfig: &tls.Config{InsecureSkipVerify: ms.insecureSkipVerify},
  123. }
  124. ms.client = &http.Client{
  125. Transport: tr,
  126. Timeout: time.Duration(ms.timeout) * time.Millisecond}
  127. logger.Infof("open rest sink with configuration: {method: %s, url: %s, bodyType: %s, timeout: %d,header: %v, sendSingle: %v, insecureSkipVerify: %v", ms.method, ms.url, ms.bodyType, ms.timeout, ms.headers, ms.sendSingle, ms.insecureSkipVerify)
  128. if _, err := url.Parse(ms.url); err != nil {
  129. return err
  130. }
  131. return nil
  132. }
  133. type MultiErrors []error
  134. func (me MultiErrors) AddError(err error) MultiErrors {
  135. me = append(me, err)
  136. return me
  137. }
  138. func (me MultiErrors) Error() string {
  139. s := make([]string, len(me))
  140. for i, v := range me {
  141. s = append(s, fmt.Sprintf("Error %d with info %s. \n", i, v))
  142. }
  143. return strings.Join(s, " ")
  144. }
  145. func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
  146. logger := ctx.GetLogger()
  147. v, ok := item.([]byte)
  148. if !ok {
  149. logger.Warnf("rest sink receive non []byte data: %v", item)
  150. }
  151. logger.Debugf("rest sink receive %s", item)
  152. resp, err := ms.Send(v, logger)
  153. if err != nil {
  154. return fmt.Errorf("rest sink fails to send out the data: %s", err)
  155. } else {
  156. logger.Debugf("rest sink got response %v", resp)
  157. if resp.StatusCode < 200 || resp.StatusCode > 299 {
  158. buf, _ := ioutil.ReadAll(resp.Body)
  159. logger.Errorf("%s\n", string(buf))
  160. return fmt.Errorf("rest sink fails to err http return code: %d and error message %s.", resp.StatusCode, string(buf))
  161. } else {
  162. if ms.debugResp {
  163. if buf, bodyErr := ioutil.ReadAll(resp.Body); bodyErr != nil {
  164. logger.Errorf("%s\n", bodyErr)
  165. } else {
  166. logger.Infof("Response content: %s\n", string(buf))
  167. }
  168. }
  169. }
  170. }
  171. return nil
  172. }
  173. func (ms *RestSink) Send(v interface{}, logger api.Logger) (*http.Response, error) {
  174. return common.Send(logger, ms.client, ms.bodyType, ms.method, ms.url, ms.headers, ms.sendSingle, v)
  175. }
  176. func (ms *RestSink) Close(ctx api.StreamContext) error {
  177. logger := ctx.GetLogger()
  178. logger.Infof("Closing rest sink")
  179. return nil
  180. }