rest_sink.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package sink
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "io"
  19. "net/http"
  20. "net/url"
  21. "strings"
  22. "time"
  23. "github.com/lf-edge/ekuiper/internal/pkg/cert"
  24. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  25. "github.com/lf-edge/ekuiper/pkg/api"
  26. "github.com/lf-edge/ekuiper/pkg/errorx"
  27. )
  28. type RestSink struct {
  29. method string
  30. url string
  31. headers map[string]string
  32. headersTemplate string
  33. bodyType string
  34. timeout int64
  35. sendSingle bool
  36. debugResp bool
  37. insecureSkipVerify bool
  38. certificationPath string
  39. privateKeyPath string
  40. rootCaPath string
  41. client *http.Client
  42. }
  43. var methodsMap = map[string]bool{"GET": true, "HEAD": true, "POST": true, "PUT": true, "DELETE": true, "PATCH": true}
  44. func (ms *RestSink) Configure(ps map[string]interface{}) error {
  45. temp, ok := ps["method"]
  46. if ok {
  47. ms.method, ok = temp.(string)
  48. if !ok {
  49. return fmt.Errorf("rest sink property method %v is not a string", temp)
  50. }
  51. ms.method = strings.ToUpper(strings.Trim(ms.method, ""))
  52. } else {
  53. ms.method = "GET"
  54. }
  55. if _, ok = methodsMap[ms.method]; !ok {
  56. return fmt.Errorf("invalid property method: %s", ms.method)
  57. }
  58. switch ms.method {
  59. case "GET", "HEAD":
  60. ms.bodyType = "none"
  61. default:
  62. ms.bodyType = "json"
  63. }
  64. temp, ok = ps["url"]
  65. if !ok {
  66. return fmt.Errorf("rest sink is missing property url")
  67. }
  68. ms.url, ok = temp.(string)
  69. if !ok {
  70. return fmt.Errorf("rest sink property url %v is not a string", temp)
  71. }
  72. ms.url = strings.Trim(ms.url, "")
  73. temp, ok = ps["headers"]
  74. if ok {
  75. switch h := temp.(type) {
  76. case map[string]interface{}:
  77. ms.headers = make(map[string]string)
  78. for k, v := range h {
  79. if v1, ok1 := v.(string); ok1 {
  80. ms.headers[k] = v1
  81. } else {
  82. return fmt.Errorf("header value %s for header %s is not a string", v, k)
  83. }
  84. }
  85. case string:
  86. ms.headersTemplate = h
  87. default:
  88. return fmt.Errorf("rest sink property headers %v is not a map[string]interface", temp)
  89. }
  90. }
  91. temp, ok = ps["bodyType"]
  92. if ok {
  93. ms.bodyType, ok = temp.(string)
  94. if !ok {
  95. return fmt.Errorf("rest sink property bodyType %v is not a string", temp)
  96. }
  97. ms.bodyType = strings.ToLower(strings.Trim(ms.bodyType, ""))
  98. }
  99. if _, ok = httpx.BodyTypeMap[ms.bodyType]; !ok {
  100. return fmt.Errorf("invalid property bodyType: %s, should be \"none\" or \"form\"", ms.bodyType)
  101. }
  102. temp, ok = ps["timeout"]
  103. if !ok {
  104. ms.timeout = 5000
  105. } else {
  106. to, ok := temp.(float64)
  107. if !ok {
  108. return fmt.Errorf("rest sink property timeout %v is not a number", temp)
  109. }
  110. ms.timeout = int64(to)
  111. }
  112. temp, ok = ps["sendSingle"]
  113. if !ok {
  114. ms.sendSingle = false
  115. } else {
  116. ms.sendSingle, ok = temp.(bool)
  117. if !ok {
  118. return fmt.Errorf("rest sink property sendSingle %v is not a bool", temp)
  119. }
  120. }
  121. temp, ok = ps["debugResp"]
  122. if !ok {
  123. ms.debugResp = false
  124. } else {
  125. ms.debugResp, ok = temp.(bool)
  126. if !ok {
  127. return fmt.Errorf("rest sink property debugResp %v is not a bool", temp)
  128. }
  129. }
  130. temp, ok = ps["insecureSkipVerify"]
  131. if !ok {
  132. ms.insecureSkipVerify = true
  133. } else {
  134. ms.insecureSkipVerify, ok = temp.(bool)
  135. if !ok {
  136. return fmt.Errorf("rest sink property insecureSkipVerify %v is not a bool", temp)
  137. }
  138. }
  139. if certPath, ok := ps["certificationPath"]; ok {
  140. if certPath1, ok1 := certPath.(string); ok1 {
  141. ms.certificationPath = certPath1
  142. } else {
  143. return fmt.Errorf("not valid rest sink property certificationPath value %v", certPath)
  144. }
  145. }
  146. if privPath, ok := ps["privateKeyPath"]; ok {
  147. if privPath1, ok1 := privPath.(string); ok1 {
  148. ms.privateKeyPath = privPath1
  149. } else {
  150. return fmt.Errorf("not valid rest sink property privateKeyPath value %v", privPath)
  151. }
  152. }
  153. if rootPath, ok := ps["rootCaPath"]; ok {
  154. if rootPath1, ok1 := rootPath.(string); ok1 {
  155. ms.rootCaPath = rootPath1
  156. } else {
  157. return fmt.Errorf("not valid rest sink property rootCaPath value %v", rootPath)
  158. }
  159. }
  160. return nil
  161. }
  162. func (ms *RestSink) Open(ctx api.StreamContext) error {
  163. logger := ctx.GetLogger()
  164. tlsOpts := cert.TlsConfigurationOptions{
  165. SkipCertVerify: ms.insecureSkipVerify,
  166. CertFile: ms.certificationPath,
  167. KeyFile: ms.privateKeyPath,
  168. CaFile: ms.rootCaPath,
  169. }
  170. tlscfg, err := cert.GenerateTLSForClient(tlsOpts)
  171. if err != nil {
  172. return err
  173. }
  174. tr := &http.Transport{
  175. TLSClientConfig: tlscfg,
  176. }
  177. ms.client = &http.Client{
  178. Transport: tr,
  179. Timeout: time.Duration(ms.timeout) * time.Millisecond}
  180. logger.Infof("open rest sink with configuration: {method: %s, url: %s, bodyType: %s, timeout: %d, header: %v, sendSingle: %v, tls cfg: %v", ms.method, ms.url, ms.bodyType, ms.timeout, ms.headers, ms.sendSingle, tlsOpts)
  181. if _, err := url.Parse(ms.url); err != nil {
  182. return err
  183. }
  184. return nil
  185. }
  186. type MultiErrors []error
  187. func (me MultiErrors) AddError(err error) MultiErrors {
  188. me = append(me, err)
  189. return me
  190. }
  191. func (me MultiErrors) Error() string {
  192. s := make([]string, len(me))
  193. for i, v := range me {
  194. s = append(s, fmt.Sprintf("Error %d with info %s. \n", i, v))
  195. }
  196. return strings.Join(s, " ")
  197. }
  198. func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
  199. logger := ctx.GetLogger()
  200. logger.Debugf("rest sink receive %s", item)
  201. output, transed, err := ctx.TransformOutput(item)
  202. if err != nil {
  203. logger.Warnf("rest sink decode data error: %v", err)
  204. return nil
  205. }
  206. var d = item
  207. if transed {
  208. d = output
  209. }
  210. resp, err := ms.Send(ctx, d, logger)
  211. if err != nil {
  212. return fmt.Errorf("rest sink fails to send out the data: %s", err)
  213. } else {
  214. defer resp.Body.Close()
  215. logger.Debugf("rest sink got response %v", resp)
  216. if resp.StatusCode < 200 || resp.StatusCode > 299 {
  217. if buf, bodyErr := io.ReadAll(resp.Body); bodyErr != nil {
  218. logger.Errorf("%s\n", bodyErr)
  219. return fmt.Errorf("%s: http return code: %d and error message %s", errorx.IOErr, resp.StatusCode, bodyErr)
  220. } else {
  221. logger.Errorf("%s\n", string(buf))
  222. return fmt.Errorf("%s: http return code: %d and error message %s", errorx.IOErr, resp.StatusCode, string(buf))
  223. }
  224. } else {
  225. if ms.debugResp {
  226. if buf, bodyErr := io.ReadAll(resp.Body); bodyErr != nil {
  227. logger.Errorf("%s\n", bodyErr)
  228. } else {
  229. logger.Infof("Response content: %s\n", string(buf))
  230. }
  231. }
  232. }
  233. }
  234. return nil
  235. }
  236. func (ms *RestSink) Send(ctx api.StreamContext, v interface{}, logger api.Logger) (*http.Response, error) {
  237. bodyType, err := ctx.ParseTemplate(ms.bodyType, v)
  238. if err != nil {
  239. return nil, err
  240. }
  241. method, err := ctx.ParseTemplate(ms.method, v)
  242. if err != nil {
  243. return nil, err
  244. }
  245. u, err := ctx.ParseTemplate(ms.url, v)
  246. if err != nil {
  247. return nil, err
  248. }
  249. var headers map[string]string
  250. if ms.headers != nil {
  251. headers = ms.headers
  252. } else if ms.headersTemplate != "" {
  253. tstr, err := ctx.ParseTemplate(ms.headersTemplate, v)
  254. if err != nil {
  255. return nil, err
  256. }
  257. err = json.Unmarshal([]byte(tstr), &headers)
  258. if err != nil {
  259. return nil, fmt.Errorf("rest sink headers template decode error: %v", err)
  260. }
  261. }
  262. return httpx.Send(logger, ms.client, bodyType, method, u, headers, ms.sendSingle, v)
  263. }
  264. func (ms *RestSink) Close(ctx api.StreamContext) error {
  265. logger := ctx.GetLogger()
  266. logger.Infof("Closing rest sink")
  267. return nil
  268. }