rest_sink.go 7.7 KB


  1. package sinks
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/common/templates"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "io/ioutil"
  9. "net/http"
  10. "net/url"
  11. "strings"
  12. "text/template"
  13. "time"
  14. )
  15. type RestSink struct {
  16. method string
  17. url string
  18. headers map[string]string
  19. bodyType string
  20. timeout int64
  21. sendSingle bool
  22. dataTemplate string
  23. client *http.Client
  24. tp *template.Template
  25. }
  26. var methodsMap = map[string]bool{"GET": true, "HEAD": true, "POST": true, "PUT": true, "DELETE": true, "PATCH": true}
  27. var bodyTypeMap = map[string]string{"none":"", "text": "text/plain", "json":"application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": ""}
  28. func (ms *RestSink) Configure(ps map[string]interface{}) error {
  29. temp, ok := ps["method"]
  30. if ok {
  31. ms.method, ok = temp.(string)
  32. if !ok {
  33. return fmt.Errorf("rest sink property method %v is not a string", temp)
  34. }
  35. ms.method = strings.ToUpper(strings.Trim(ms.method, ""))
  36. }else{
  37. ms.method = "GET"
  38. }
  39. if _, ok = methodsMap[ms.method]; !ok {
  40. return fmt.Errorf("invalid property method: %s", ms.method)
  41. }
  42. switch ms.method{
  43. case "GET", "HEAD":
  44. ms.bodyType = "none"
  45. default:
  46. ms.bodyType = "json"
  47. }
  48. temp, ok = ps["url"]
  49. if !ok {
  50. return fmt.Errorf("rest sink is missing property url")
  51. }
  52. ms.url, ok = temp.(string)
  53. if !ok {
  54. return fmt.Errorf("rest sink property url %v is not a string", temp)
  55. }
  56. ms.url = strings.ToLower(strings.Trim(ms.url, ""))
  57. temp, ok = ps["headers"]
  58. if ok{
  59. ms.headers, ok = temp.(map[string]string)
  60. if !ok {
  61. return fmt.Errorf("rest sink property headers %v is not a map[string][]string", temp)
  62. }
  63. }
  64. temp, ok = ps["bodyType"]
  65. if ok{
  66. ms.bodyType, ok = temp.(string)
  67. if !ok {
  68. return fmt.Errorf("rest sink property bodyType %v is not a string", temp)
  69. }
  70. ms.bodyType = strings.ToLower(strings.Trim(ms.bodyType, ""))
  71. }
  72. if _, ok = bodyTypeMap[ms.bodyType]; !ok {
  73. return fmt.Errorf("invalid property bodyType: %s, should be \"none\" or \"form\"", ms.bodyType)
  74. }
  75. temp, ok = ps["timeout"]
  76. if !ok {
  77. ms.timeout = 5000
  78. }else{
  79. to, ok := temp.(float64)
  80. if !ok {
  81. return fmt.Errorf("rest sink property timeout %v is not a number", temp)
  82. }
  83. ms.timeout = int64(to)
  84. }
  85. temp, ok = ps["sendSingle"]
  86. if !ok{
  87. ms.sendSingle = false
  88. }else{
  89. ms.sendSingle, ok = temp.(bool)
  90. if !ok {
  91. return fmt.Errorf("rest sink property sendSingle %v is not a bool", temp)
  92. }
  93. }
  94. temp, ok = ps["dataTemplate"]
  95. if ok{
  96. ms.dataTemplate, ok = temp.(string)
  97. if !ok {
  98. return fmt.Errorf("rest sink property dataTemplate %v is not a string", temp)
  99. }
  100. }
  101. if ms.dataTemplate != ""{
  102. funcMap := template.FuncMap{
  103. "json": templates.JsonMarshal,
  104. }
  105. temp, err := template.New("restSink").Funcs(funcMap).Parse(ms.dataTemplate)
  106. if err != nil{
  107. return fmt.Errorf("rest sink property dataTemplate %v is invalid: %v", ms.dataTemplate, err)
  108. }else{
  109. ms.tp = temp
  110. }
  111. }
  112. return nil
  113. }
  114. func (ms *RestSink) Open(ctx api.StreamContext) error {
  115. logger := ctx.GetLogger()
  116. ms.client = &http.Client{Timeout: time.Duration(ms.timeout) * time.Millisecond}
  117. logger.Debugf("open rest sink with configuration: {method: %s, url: %s, bodyType: %s, timeout: %d,header: %v, sendSingle: %v, dataTemplate: %s", ms.method, ms.url, ms.bodyType, ms.timeout, ms.headers, ms.sendSingle, ms.dataTemplate)
  118. return nil
  119. }
  120. func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
  121. logger := ctx.GetLogger()
  122. v, ok := item.([]byte)
  123. if !ok {
  124. logger.Warnf("rest sink receive non []byte data: %v", item)
  125. }
  126. logger.Debugf("rest sink receive %s", item)
  127. if !ms.sendSingle{
  128. return ms.send(v, logger)
  129. }else{
  130. j, err := extractInput(v)
  131. if err != nil {
  132. return err
  133. }
  134. logger.Debugf("receive %d records", len(j))
  135. for _, r := range j {
  136. ms.send(r, logger)
  137. }
  138. }
  139. return nil
  140. }
  141. func extractInput(v []byte) ([]map[string]interface{}, error) {
  142. var j []map[string]interface{}
  143. if err := json.Unmarshal(v, &j); err != nil {
  144. return nil, fmt.Errorf("fail to decode the input %s as json: %v", v, err)
  145. }
  146. return j, nil
  147. }
  148. func (ms *RestSink) send(v interface{}, logger api.Logger) error {
  149. var req *http.Request
  150. var err error
  151. switch ms.bodyType {
  152. case "none":
  153. req, err = http.NewRequest(ms.method, ms.url, nil)
  154. if err != nil {
  155. return fmt.Errorf("fail to create request: %v", err)
  156. }
  157. case "json", "text", "javascript", "html", "xml":
  158. var body = &(bytes.Buffer{})
  159. switch t := v.(type) {
  160. case []byte:
  161. if ms.tp != nil {
  162. j, err := extractInput(t)
  163. if err != nil {
  164. return err
  165. }
  166. err = ms.tp.Execute(body, j)
  167. if err != nil{
  168. return fmt.Errorf("fail to decode content: %v", err)
  169. }
  170. }else{
  171. body = bytes.NewBuffer(t)
  172. }
  173. case map[string]interface{}:
  174. if ms.tp != nil{
  175. err = ms.tp.Execute(body, t)
  176. if err != nil{
  177. return fmt.Errorf("fail to decode content: %v", err)
  178. }
  179. }else{
  180. content, err := json.Marshal(t)
  181. if err != nil{
  182. return fmt.Errorf("fail to decode content: %v", err)
  183. }
  184. body = bytes.NewBuffer(content)
  185. }
  186. default:
  187. return fmt.Errorf("invalid content: %v", v)
  188. }
  189. req, err = http.NewRequest(ms.method, ms.url, body)
  190. if err != nil {
  191. return fmt.Errorf("fail to create request: %v", err)
  192. }
  193. req.Header.Set("Content-Type", bodyTypeMap[ms.bodyType])
  194. case "form":
  195. form := url.Values{}
  196. im, err := convertToMap(v, ms.tp)
  197. if err != nil {
  198. return err
  199. }
  200. for key, value := range im {
  201. var vstr string
  202. switch value.(type) {
  203. case []interface{}, map[string]interface{}:
  204. if temp, err := json.Marshal(value); err != nil {
  205. return fmt.Errorf("fail to parse fomr value: %v", err)
  206. }else{
  207. vstr = string(temp)
  208. }
  209. default:
  210. vstr = fmt.Sprintf("%v", value)
  211. }
  212. form.Set(key, vstr)
  213. }
  214. body := ioutil.NopCloser(strings.NewReader(form.Encode()))
  215. req, err = http.NewRequest(ms.method, ms.url, body)
  216. if err != nil {
  217. return fmt.Errorf("fail to create request: %v", err)
  218. }
  219. req.Header.Set("Content-Type", "application/x-www-form-urlencoded;param=value")
  220. default:
  221. return fmt.Errorf("unsupported body type %s", ms.bodyType)
  222. }
  223. if len(ms.headers) > 0 {
  224. for k, v := range ms.headers {
  225. req.Header.Set(k, v)
  226. }
  227. }
  228. logger.Debugf("do request: %s %s with %s", ms.method, ms.url, req.Body)
  229. resp, err := ms.client.Do(req)
  230. if err != nil {
  231. return fmt.Errorf("rest sink fails to send out the data")
  232. } else {
  233. logger.Debugf("rest sink got response %v", resp)
  234. }
  235. return nil
  236. }
  237. func convertToMap(v interface{}, tp *template.Template) (map[string]interface{}, error) {
  238. switch t := v.(type) {
  239. case []byte:
  240. if tp != nil{
  241. j, err := extractInput(t)
  242. if err != nil {
  243. return nil, err
  244. }
  245. var output bytes.Buffer
  246. err = tp.Execute(&output, j)
  247. if err != nil{
  248. return nil, fmt.Errorf("fail to decode content: %v", err)
  249. }
  250. r := make(map[string]interface{})
  251. if err := json.Unmarshal(output.Bytes(), &r); err != nil{
  252. return nil, fmt.Errorf("fail to decode content: %v", err)
  253. }else{
  254. return r, nil
  255. }
  256. }else{
  257. r := make(map[string]interface{})
  258. r["result"] = string(t)
  259. return r, nil
  260. }
  261. case map[string]interface{}:
  262. if tp != nil{
  263. var output bytes.Buffer
  264. err := tp.Execute(&output, t)
  265. if err != nil{
  266. return nil, fmt.Errorf("fail to decode content: %v", err)
  267. }
  268. r := make(map[string]interface{})
  269. if err := json.Unmarshal(output.Bytes(), &r); err != nil{
  270. return nil, fmt.Errorf("fail to decode content: %v", err)
  271. }else{
  272. return r, nil
  273. }
  274. }else{
  275. return t, nil
  276. }
  277. default:
  278. return nil, fmt.Errorf("invalid content: %v", v)
  279. }
  280. return nil, fmt.Errorf("invalid content: %v", v)
  281. }
  282. func (ms *RestSink) Close(ctx api.StreamContext) error {
  283. logger := ctx.GetLogger()
  284. logger.Infof("Closing rest sink")
  285. return nil
  286. }