rest_sink.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package http
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "io"
  19. "net/http"
  20. "strings"
  21. "time"
  22. "github.com/lf-edge/ekuiper/internal/pkg/cert"
  23. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  24. "github.com/lf-edge/ekuiper/pkg/api"
  25. "github.com/lf-edge/ekuiper/pkg/errorx"
  26. )
  27. type RestSink struct {
  28. method string
  29. url string
  30. headers map[string]string
  31. headersTemplate string
  32. bodyType string
  33. timeout int64
  34. sendSingle bool
  35. debugResp bool
  36. insecureSkipVerify bool
  37. certificationPath string
  38. privateKeyPath string
  39. rootCaPath string
  40. client *http.Client
  41. }
  42. var methodsMap = map[string]bool{"GET": true, "HEAD": true, "POST": true, "PUT": true, "DELETE": true, "PATCH": true}
  43. func (ms *RestSink) Configure(ps map[string]interface{}) error {
  44. temp, ok := ps["method"]
  45. if ok {
  46. ms.method, ok = temp.(string)
  47. if !ok {
  48. return fmt.Errorf("rest sink property method %v is not a string", temp)
  49. }
  50. ms.method = strings.ToUpper(strings.Trim(ms.method, ""))
  51. } else {
  52. ms.method = "GET"
  53. }
  54. if _, ok = methodsMap[ms.method]; !ok {
  55. return fmt.Errorf("invalid property method: %s", ms.method)
  56. }
  57. switch ms.method {
  58. case "GET", "HEAD":
  59. ms.bodyType = "none"
  60. default:
  61. ms.bodyType = "json"
  62. }
  63. temp, ok = ps["url"]
  64. if !ok {
  65. return fmt.Errorf("rest sink is missing property url")
  66. }
  67. ms.url, ok = temp.(string)
  68. if !ok {
  69. return fmt.Errorf("rest sink property url %v is not a string", temp)
  70. }
  71. ms.url = strings.Trim(ms.url, "")
  72. temp, ok = ps["headers"]
  73. if ok {
  74. switch h := temp.(type) {
  75. case map[string]interface{}:
  76. ms.headers = make(map[string]string)
  77. for k, v := range h {
  78. if v1, ok1 := v.(string); ok1 {
  79. ms.headers[k] = v1
  80. } else {
  81. return fmt.Errorf("header value %s for header %s is not a string", v, k)
  82. }
  83. }
  84. case string:
  85. ms.headersTemplate = h
  86. default:
  87. return fmt.Errorf("rest sink property headers %v is not a map[string]interface", temp)
  88. }
  89. }
  90. temp, ok = ps["bodyType"]
  91. if ok {
  92. ms.bodyType, ok = temp.(string)
  93. if !ok {
  94. return fmt.Errorf("rest sink property bodyType %v is not a string", temp)
  95. }
  96. ms.bodyType = strings.ToLower(strings.Trim(ms.bodyType, ""))
  97. }
  98. if _, ok = httpx.BodyTypeMap[ms.bodyType]; !ok {
  99. return fmt.Errorf("invalid property bodyType: %s, should be \"none\" or \"form\"", ms.bodyType)
  100. }
  101. temp, ok = ps["timeout"]
  102. if !ok {
  103. ms.timeout = 5000
  104. } else {
  105. to, ok := temp.(float64)
  106. if !ok {
  107. return fmt.Errorf("rest sink property timeout %v is not a number", temp)
  108. }
  109. ms.timeout = int64(to)
  110. }
  111. temp, ok = ps["sendSingle"]
  112. if !ok {
  113. ms.sendSingle = false
  114. } else {
  115. ms.sendSingle, ok = temp.(bool)
  116. if !ok {
  117. return fmt.Errorf("rest sink property sendSingle %v is not a bool", temp)
  118. }
  119. }
  120. temp, ok = ps["debugResp"]
  121. if !ok {
  122. ms.debugResp = false
  123. } else {
  124. ms.debugResp, ok = temp.(bool)
  125. if !ok {
  126. return fmt.Errorf("rest sink property debugResp %v is not a bool", temp)
  127. }
  128. }
  129. temp, ok = ps["insecureSkipVerify"]
  130. if !ok {
  131. ms.insecureSkipVerify = true
  132. } else {
  133. ms.insecureSkipVerify, ok = temp.(bool)
  134. if !ok {
  135. return fmt.Errorf("rest sink property insecureSkipVerify %v is not a bool", temp)
  136. }
  137. }
  138. if certPath, ok := ps["certificationPath"]; ok {
  139. if certPath1, ok1 := certPath.(string); ok1 {
  140. ms.certificationPath = certPath1
  141. } else {
  142. return fmt.Errorf("not valid rest sink property certificationPath value %v", certPath)
  143. }
  144. }
  145. if privPath, ok := ps["privateKeyPath"]; ok {
  146. if privPath1, ok1 := privPath.(string); ok1 {
  147. ms.privateKeyPath = privPath1
  148. } else {
  149. return fmt.Errorf("not valid rest sink property privateKeyPath value %v", privPath)
  150. }
  151. }
  152. if rootPath, ok := ps["rootCaPath"]; ok {
  153. if rootPath1, ok1 := rootPath.(string); ok1 {
  154. ms.rootCaPath = rootPath1
  155. } else {
  156. return fmt.Errorf("not valid rest sink property rootCaPath value %v", rootPath)
  157. }
  158. }
  159. return nil
  160. }
  161. func (ms *RestSink) Open(ctx api.StreamContext) error {
  162. logger := ctx.GetLogger()
  163. tlsOpts := cert.TlsConfigurationOptions{
  164. SkipCertVerify: ms.insecureSkipVerify,
  165. CertFile: ms.certificationPath,
  166. KeyFile: ms.privateKeyPath,
  167. CaFile: ms.rootCaPath,
  168. }
  169. tlscfg, err := cert.GenerateTLSForClient(tlsOpts)
  170. if err != nil {
  171. return err
  172. }
  173. tr := &http.Transport{
  174. TLSClientConfig: tlscfg,
  175. }
  176. ms.client = &http.Client{
  177. Transport: tr,
  178. Timeout: time.Duration(ms.timeout) * time.Millisecond}
  179. logger.Infof("open rest sink with configuration: {method: %s, url: %s, bodyType: %s, timeout: %d, header: %v, sendSingle: %v, tls cfg: %v", ms.method, ms.url, ms.bodyType, ms.timeout, ms.headers, ms.sendSingle, tlsOpts)
  180. if err := httpx.IsHttpUrl(ms.url); err != nil {
  181. return err
  182. }
  183. return nil
  184. }
  185. type MultiErrors []error
  186. func (me MultiErrors) AddError(err error) MultiErrors {
  187. me = append(me, err)
  188. return me
  189. }
  190. func (me MultiErrors) Error() string {
  191. s := make([]string, len(me))
  192. for i, v := range me {
  193. s = append(s, fmt.Sprintf("Error %d with info %s. \n", i, v))
  194. }
  195. return strings.Join(s, " ")
  196. }
  197. func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
  198. logger := ctx.GetLogger()
  199. logger.Debugf("rest sink receive %s", item)
  200. output, transed, err := ctx.TransformOutput(item)
  201. if err != nil {
  202. logger.Warnf("rest sink decode data error: %v", err)
  203. return nil
  204. }
  205. var d = item
  206. if transed {
  207. d = output
  208. }
  209. resp, err := ms.Send(ctx, d, logger)
  210. if err != nil {
  211. return fmt.Errorf("rest sink fails to send out the data: %s", err)
  212. } else {
  213. defer resp.Body.Close()
  214. logger.Debugf("rest sink got response %v", resp)
  215. if resp.StatusCode < 200 || resp.StatusCode > 299 {
  216. if buf, bodyErr := io.ReadAll(resp.Body); bodyErr != nil {
  217. logger.Errorf("%s\n", bodyErr)
  218. return fmt.Errorf("%s: http return code: %d and error message %s", errorx.IOErr, resp.StatusCode, bodyErr)
  219. } else {
  220. logger.Errorf("%s\n", string(buf))
  221. return fmt.Errorf("%s: http return code: %d and error message %s", errorx.IOErr, resp.StatusCode, string(buf))
  222. }
  223. } else {
  224. if ms.debugResp {
  225. if buf, bodyErr := io.ReadAll(resp.Body); bodyErr != nil {
  226. logger.Errorf("%s\n", bodyErr)
  227. } else {
  228. logger.Infof("Response content: %s\n", string(buf))
  229. }
  230. }
  231. }
  232. }
  233. return nil
  234. }
  235. func (ms *RestSink) Send(ctx api.StreamContext, v interface{}, logger api.Logger) (*http.Response, error) {
  236. bodyType, err := ctx.ParseTemplate(ms.bodyType, v)
  237. if err != nil {
  238. return nil, err
  239. }
  240. method, err := ctx.ParseTemplate(ms.method, v)
  241. if err != nil {
  242. return nil, err
  243. }
  244. u, err := ctx.ParseTemplate(ms.url, v)
  245. if err != nil {
  246. return nil, err
  247. }
  248. var headers map[string]string
  249. if ms.headers != nil {
  250. headers = ms.headers
  251. } else if ms.headersTemplate != "" {
  252. tstr, err := ctx.ParseTemplate(ms.headersTemplate, v)
  253. if err != nil {
  254. return nil, err
  255. }
  256. err = json.Unmarshal([]byte(tstr), &headers)
  257. if err != nil {
  258. return nil, fmt.Errorf("rest sink headers template decode error: %v", err)
  259. }
  260. }
  261. return httpx.Send(logger, ms.client, bodyType, method, u, headers, ms.sendSingle, v)
  262. }
  263. func (ms *RestSink) Close(ctx api.StreamContext) error {
  264. logger := ctx.GetLogger()
  265. logger.Infof("Closing rest sink")
  266. return nil
  267. }