rest_sink.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. package sinks
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/common/templates"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "io/ioutil"
  9. "net"
  10. "net/http"
  11. "net/url"
  12. "strings"
  13. "text/template"
  14. "time"
  15. )
  16. type RestSink struct {
  17. method string
  18. url string
  19. headers map[string]string
  20. bodyType string
  21. timeout int64
  22. sendSingle bool
  23. dataTemplate string
  24. client *http.Client
  25. tp *template.Template
  26. }
  27. var methodsMap = map[string]bool{"GET": true, "HEAD": true, "POST": true, "PUT": true, "DELETE": true, "PATCH": true}
  28. var bodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": ""}
  29. func (ms *RestSink) Configure(ps map[string]interface{}) error {
  30. temp, ok := ps["method"]
  31. if ok {
  32. ms.method, ok = temp.(string)
  33. if !ok {
  34. return fmt.Errorf("rest sink property method %v is not a string", temp)
  35. }
  36. ms.method = strings.ToUpper(strings.Trim(ms.method, ""))
  37. } else {
  38. ms.method = "GET"
  39. }
  40. if _, ok = methodsMap[ms.method]; !ok {
  41. return fmt.Errorf("invalid property method: %s", ms.method)
  42. }
  43. switch ms.method {
  44. case "GET", "HEAD":
  45. ms.bodyType = "none"
  46. default:
  47. ms.bodyType = "json"
  48. }
  49. temp, ok = ps["url"]
  50. if !ok {
  51. return fmt.Errorf("rest sink is missing property url")
  52. }
  53. ms.url, ok = temp.(string)
  54. if !ok {
  55. return fmt.Errorf("rest sink property url %v is not a string", temp)
  56. }
  57. ms.url = strings.Trim(ms.url, "")
  58. temp, ok = ps["headers"]
  59. if ok {
  60. ms.headers, ok = temp.(map[string]string)
  61. if !ok {
  62. return fmt.Errorf("rest sink property headers %v is not a map[string][]string", temp)
  63. }
  64. }
  65. temp, ok = ps["bodyType"]
  66. if ok {
  67. ms.bodyType, ok = temp.(string)
  68. if !ok {
  69. return fmt.Errorf("rest sink property bodyType %v is not a string", temp)
  70. }
  71. ms.bodyType = strings.ToLower(strings.Trim(ms.bodyType, ""))
  72. }
  73. if _, ok = bodyTypeMap[ms.bodyType]; !ok {
  74. return fmt.Errorf("invalid property bodyType: %s, should be \"none\" or \"form\"", ms.bodyType)
  75. }
  76. temp, ok = ps["timeout"]
  77. if !ok {
  78. ms.timeout = 5000
  79. } else {
  80. to, ok := temp.(float64)
  81. if !ok {
  82. return fmt.Errorf("rest sink property timeout %v is not a number", temp)
  83. }
  84. ms.timeout = int64(to)
  85. }
  86. temp, ok = ps["sendSingle"]
  87. if !ok {
  88. ms.sendSingle = false
  89. } else {
  90. ms.sendSingle, ok = temp.(bool)
  91. if !ok {
  92. return fmt.Errorf("rest sink property sendSingle %v is not a bool", temp)
  93. }
  94. }
  95. temp, ok = ps["dataTemplate"]
  96. if ok {
  97. ms.dataTemplate, ok = temp.(string)
  98. if !ok {
  99. return fmt.Errorf("rest sink property dataTemplate %v is not a string", temp)
  100. }
  101. }
  102. if ms.dataTemplate != "" {
  103. funcMap := template.FuncMap{
  104. "json": templates.JsonMarshal,
  105. }
  106. temp, err := template.New("restSink").Funcs(funcMap).Parse(ms.dataTemplate)
  107. if err != nil {
  108. return fmt.Errorf("rest sink property dataTemplate %v is invalid: %v", ms.dataTemplate, err)
  109. } else {
  110. ms.tp = temp
  111. }
  112. }
  113. return nil
  114. }
  115. func (ms *RestSink) Open(ctx api.StreamContext) error {
  116. logger := ctx.GetLogger()
  117. ms.client = &http.Client{Timeout: time.Duration(ms.timeout) * time.Millisecond}
  118. logger.Infof("open rest sink with configuration: {method: %s, url: %s, bodyType: %s, timeout: %d,header: %v, sendSingle: %v, dataTemplate: %s", ms.method, ms.url, ms.bodyType, ms.timeout, ms.headers, ms.sendSingle, ms.dataTemplate)
  119. timeout := time.Duration(1 * time.Second)
  120. if u, err := url.Parse(ms.url); err != nil {
  121. return err
  122. } else {
  123. _, err := net.DialTimeout("tcp", u.Host, timeout)
  124. if err != nil {
  125. logger.Errorf("Target web server unreachable: %s", err)
  126. return err
  127. } else {
  128. logger.Infof("Target web server is available.")
  129. }
  130. }
  131. return nil
  132. }
  133. type MultiErrors []error
  134. func (me MultiErrors) AddError(err error) MultiErrors {
  135. me = append(me, err)
  136. return me
  137. }
  138. func (me MultiErrors) Error() string {
  139. s := make([]string, len(me))
  140. for i, v := range me {
  141. s = append(s, fmt.Sprintf("Error %d with info %s. \n", i, v))
  142. }
  143. return strings.Join(s, " ")
  144. }
  145. func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
  146. logger := ctx.GetLogger()
  147. v, ok := item.([]byte)
  148. if !ok {
  149. logger.Warnf("rest sink receive non []byte data: %v", item)
  150. }
  151. logger.Debugf("rest sink receive %s", item)
  152. if !ms.sendSingle {
  153. return ms.send(v, logger)
  154. } else {
  155. j, err := extractInput(v)
  156. if err != nil {
  157. return err
  158. }
  159. logger.Debugf("receive %d records", len(j))
  160. var errs MultiErrors
  161. for _, r := range j {
  162. if e := ms.send(r, logger); e != nil {
  163. errs = errs.AddError(e)
  164. }
  165. }
  166. if len(errs) != 0 {
  167. return errs
  168. }
  169. }
  170. return nil
  171. }
  172. func extractInput(v []byte) ([]map[string]interface{}, error) {
  173. var j []map[string]interface{}
  174. if err := json.Unmarshal(v, &j); err != nil {
  175. return nil, fmt.Errorf("fail to decode the input %s as json: %v", v, err)
  176. }
  177. return j, nil
  178. }
  179. func (ms *RestSink) send(v interface{}, logger api.Logger) error {
  180. var req *http.Request
  181. var err error
  182. switch ms.bodyType {
  183. case "none":
  184. req, err = http.NewRequest(ms.method, ms.url, nil)
  185. if err != nil {
  186. return fmt.Errorf("fail to create request: %v", err)
  187. }
  188. case "json", "text", "javascript", "html", "xml":
  189. var body = &(bytes.Buffer{})
  190. switch t := v.(type) {
  191. case []byte:
  192. if ms.tp != nil {
  193. j, err := extractInput(t)
  194. if err != nil {
  195. return err
  196. }
  197. err = ms.tp.Execute(body, j)
  198. if err != nil {
  199. return fmt.Errorf("fail to decode content: %v", err)
  200. }
  201. } else {
  202. body = bytes.NewBuffer(t)
  203. }
  204. case map[string]interface{}:
  205. if ms.tp != nil {
  206. err = ms.tp.Execute(body, t)
  207. if err != nil {
  208. return fmt.Errorf("fail to decode content: %v", err)
  209. }
  210. } else {
  211. content, err := json.Marshal(t)
  212. if err != nil {
  213. return fmt.Errorf("fail to decode content: %v", err)
  214. }
  215. body = bytes.NewBuffer(content)
  216. }
  217. default:
  218. return fmt.Errorf("invalid content: %v", v)
  219. }
  220. req, err = http.NewRequest(ms.method, ms.url, body)
  221. if err != nil {
  222. return fmt.Errorf("fail to create request: %v", err)
  223. }
  224. req.Header.Set("Content-Type", bodyTypeMap[ms.bodyType])
  225. case "form":
  226. form := url.Values{}
  227. im, err := convertToMap(v, ms.tp)
  228. if err != nil {
  229. return err
  230. }
  231. for key, value := range im {
  232. var vstr string
  233. switch value.(type) {
  234. case []interface{}, map[string]interface{}:
  235. if temp, err := json.Marshal(value); err != nil {
  236. return fmt.Errorf("fail to parse fomr value: %v", err)
  237. } else {
  238. vstr = string(temp)
  239. }
  240. default:
  241. vstr = fmt.Sprintf("%v", value)
  242. }
  243. form.Set(key, vstr)
  244. }
  245. body := ioutil.NopCloser(strings.NewReader(form.Encode()))
  246. req, err = http.NewRequest(ms.method, ms.url, body)
  247. if err != nil {
  248. return fmt.Errorf("fail to create request: %v", err)
  249. }
  250. req.Header.Set("Content-Type", "application/x-www-form-urlencoded;param=value")
  251. default:
  252. return fmt.Errorf("unsupported body type %s", ms.bodyType)
  253. }
  254. if len(ms.headers) > 0 {
  255. for k, v := range ms.headers {
  256. req.Header.Set(k, v)
  257. }
  258. }
  259. logger.Debugf("do request: %s %s with %s", ms.method, ms.url, req.Body)
  260. resp, err := ms.client.Do(req)
  261. if resp.StatusCode < 200 || resp.StatusCode > 299 {
  262. return fmt.Errorf("rest sink fails to err http return code: %d.", resp.StatusCode)
  263. }
  264. if err != nil {
  265. return fmt.Errorf("rest sink fails to send out the data")
  266. } else {
  267. logger.Debugf("rest sink got response %v", resp)
  268. }
  269. return nil
  270. }
  271. func convertToMap(v interface{}, tp *template.Template) (map[string]interface{}, error) {
  272. switch t := v.(type) {
  273. case []byte:
  274. if tp != nil {
  275. j, err := extractInput(t)
  276. if err != nil {
  277. return nil, err
  278. }
  279. var output bytes.Buffer
  280. err = tp.Execute(&output, j)
  281. if err != nil {
  282. return nil, fmt.Errorf("fail to decode content: %v", err)
  283. }
  284. r := make(map[string]interface{})
  285. if err := json.Unmarshal(output.Bytes(), &r); err != nil {
  286. return nil, fmt.Errorf("fail to decode content: %v", err)
  287. } else {
  288. return r, nil
  289. }
  290. } else {
  291. r := make(map[string]interface{})
  292. r["result"] = string(t)
  293. return r, nil
  294. }
  295. case map[string]interface{}:
  296. if tp != nil {
  297. var output bytes.Buffer
  298. err := tp.Execute(&output, t)
  299. if err != nil {
  300. return nil, fmt.Errorf("fail to decode content: %v", err)
  301. }
  302. r := make(map[string]interface{})
  303. if err := json.Unmarshal(output.Bytes(), &r); err != nil {
  304. return nil, fmt.Errorf("fail to decode content: %v", err)
  305. } else {
  306. return r, nil
  307. }
  308. } else {
  309. return t, nil
  310. }
  311. default:
  312. return nil, fmt.Errorf("invalid content: %v", v)
  313. }
  314. return nil, fmt.Errorf("invalid content: %v", v)
  315. }
  316. func (ms *RestSink) Close(ctx api.StreamContext) error {
  317. logger := ctx.GetLogger()
  318. logger.Infof("Closing rest sink")
  319. return nil
  320. }