rest_sink_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package http
  15. import (
  16. "fmt"
  17. "io"
  18. "net/http"
  19. "net/http/httptest"
  20. "reflect"
  21. "strings"
  22. "testing"
  23. "time"
  24. "github.com/lf-edge/ekuiper/internal/conf"
  25. "github.com/lf-edge/ekuiper/internal/topo/context"
  26. "github.com/lf-edge/ekuiper/internal/topo/transform"
  27. "github.com/lf-edge/ekuiper/pkg/errorx"
  28. )
  29. type request struct {
  30. Method string
  31. Body string
  32. ContentType string
  33. }
  34. func TestRestSink_Apply(t *testing.T) {
  35. tests := []struct {
  36. config map[string]interface{}
  37. data []map[string]interface{}
  38. result []request
  39. }{
  40. {
  41. config: map[string]interface{}{
  42. "method": "post",
  43. //"url": "http://localhost/test", //set dynamically to the test server
  44. "sendSingle": true,
  45. },
  46. data: []map[string]interface{}{{
  47. "ab": "hello1",
  48. }, {
  49. "ab": "hello2",
  50. }},
  51. result: []request{{
  52. Method: "POST",
  53. Body: `{"ab":"hello1"}`,
  54. ContentType: "application/json",
  55. }, {
  56. Method: "POST",
  57. Body: `{"ab":"hello2"}`,
  58. ContentType: "application/json",
  59. }},
  60. }, {
  61. config: map[string]interface{}{
  62. "method": "post",
  63. //"url": "http://localhost/test", //set dynamically to the test server
  64. },
  65. data: []map[string]interface{}{{
  66. "ab": "hello1",
  67. }, {
  68. "ab": "hello2",
  69. }},
  70. result: []request{{
  71. Method: "POST",
  72. Body: `[{"ab":"hello1"},{"ab":"hello2"}]`,
  73. ContentType: "application/json",
  74. }},
  75. }, {
  76. config: map[string]interface{}{
  77. "method": "get",
  78. //"url": "http://localhost/test", //set dynamically to the test server
  79. },
  80. data: []map[string]interface{}{{
  81. "ab": "hello1",
  82. }, {
  83. "ab": "hello2",
  84. }},
  85. result: []request{{
  86. Method: "GET",
  87. ContentType: "",
  88. }},
  89. }, {
  90. config: map[string]interface{}{
  91. "method": "put",
  92. //"url": "http://localhost/test", //set dynamically to the test server
  93. "bodyType": "text",
  94. },
  95. data: []map[string]interface{}{{
  96. "ab": "hello1",
  97. }, {
  98. "ab": "hello2",
  99. }},
  100. result: []request{{
  101. Method: "PUT",
  102. ContentType: "text/plain",
  103. Body: `[{"ab":"hello1"},{"ab":"hello2"}]`,
  104. }},
  105. }, {
  106. config: map[string]interface{}{
  107. "method": "post",
  108. //"url": "http://localhost/test", //set dynamically to the test server
  109. "bodyType": "form",
  110. },
  111. data: []map[string]interface{}{{
  112. "ab": "hello1",
  113. }, {
  114. "ab": "hello2",
  115. }},
  116. result: []request{{
  117. Method: "POST",
  118. ContentType: "application/x-www-form-urlencoded;param=value",
  119. Body: `result=%5B%7B%22ab%22%3A%22hello1%22%7D%2C%7B%22ab%22%3A%22hello2%22%7D%5D`,
  120. }},
  121. }, {
  122. config: map[string]interface{}{
  123. "method": "post",
  124. //"url": "http://localhost/test", //set dynamically to the test server
  125. "bodyType": "form",
  126. "sendSingle": true,
  127. },
  128. data: []map[string]interface{}{{
  129. "ab": "hello1",
  130. }, {
  131. "ab": "hello2",
  132. }},
  133. result: []request{{
  134. Method: "POST",
  135. ContentType: "application/x-www-form-urlencoded;param=value",
  136. Body: `ab=hello1`,
  137. }, {
  138. Method: "POST",
  139. ContentType: "application/x-www-form-urlencoded;param=value",
  140. Body: `ab=hello2`,
  141. }},
  142. }, {
  143. config: map[string]interface{}{
  144. "method": "post",
  145. //"url": "http://localhost/test", //set dynamically to the test server
  146. "bodyType": "json",
  147. "sendSingle": true,
  148. "timeout": float64(1000),
  149. },
  150. data: []map[string]interface{}{{
  151. "ab": "hello1",
  152. }, {
  153. "ab": "hello2",
  154. }},
  155. result: []request{{
  156. Method: "POST",
  157. Body: `{"ab":"hello1"}`,
  158. ContentType: "application/json",
  159. }, {
  160. Method: "POST",
  161. Body: `{"ab":"hello2"}`,
  162. ContentType: "application/json",
  163. }},
  164. },
  165. }
  166. t.Logf("The test bucket size is %d.", len(tests))
  167. contextLogger := conf.Log.WithField("rule", "TestRestSink_Apply")
  168. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  169. var requests []request
  170. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  171. body, err := io.ReadAll(r.Body)
  172. if err != nil {
  173. fmt.Printf("Error reading body: %v", err)
  174. http.Error(w, "can't read body", http.StatusBadRequest)
  175. return
  176. }
  177. requests = append(requests, request{
  178. Method: r.Method,
  179. Body: string(body),
  180. ContentType: r.Header.Get("Content-Type"),
  181. })
  182. contextLogger.Debugf(string(body))
  183. fmt.Fprint(w, string(body))
  184. }))
  185. tf, _ := transform.GenTransform("", "json", "", "", "", []string{})
  186. defer ts.Close()
  187. for i, tt := range tests {
  188. requests = nil
  189. ss, ok := tt.config["sendSingle"]
  190. if !ok {
  191. ss = false
  192. }
  193. s := &RestSink{}
  194. tt.config["url"] = ts.URL
  195. s.Configure(tt.config)
  196. s.Open(ctx)
  197. vCtx := context.WithValue(ctx, context.TransKey, tf)
  198. if ss.(bool) {
  199. for _, d := range tt.data {
  200. s.Collect(vCtx, d)
  201. }
  202. } else {
  203. s.Collect(vCtx, tt.data)
  204. }
  205. s.Close(ctx)
  206. if !reflect.DeepEqual(tt.result, requests) {
  207. t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, requests)
  208. }
  209. }
  210. }
  211. func TestRestSinkTemplate_Apply(t *testing.T) {
  212. tests := []struct {
  213. config map[string]interface{}
  214. data [][]byte
  215. result []request
  216. }{
  217. {
  218. config: map[string]interface{}{
  219. "method": "post",
  220. //"url": "http://localhost/test", //set dynamically to the test server
  221. "sendSingle": true,
  222. "dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
  223. },
  224. data: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
  225. result: []request{{
  226. Method: "POST",
  227. Body: `{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`,
  228. ContentType: "application/json",
  229. }, {
  230. Method: "POST",
  231. Body: `{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`,
  232. ContentType: "application/json",
  233. }},
  234. }, {
  235. config: map[string]interface{}{
  236. "method": "post",
  237. //"url": "http://localhost/test", //set dynamically to the test server
  238. "dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
  239. },
  240. data: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
  241. result: []request{{
  242. Method: "POST",
  243. Body: `{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`,
  244. ContentType: "application/json",
  245. }},
  246. }, {
  247. config: map[string]interface{}{
  248. "method": "get",
  249. //"url": "http://localhost/test", //set dynamically to the test server
  250. "dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
  251. },
  252. data: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`)},
  253. result: []request{{
  254. Method: "GET",
  255. ContentType: "",
  256. }},
  257. }, {
  258. config: map[string]interface{}{
  259. "method": "put",
  260. //"url": "http://localhost/test", //set dynamically to the test server
  261. "bodyType": "html",
  262. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
  263. },
  264. data: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
  265. result: []request{{
  266. Method: "PUT",
  267. ContentType: "text/html",
  268. Body: `<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`,
  269. }},
  270. }, {
  271. config: map[string]interface{}{
  272. "method": "post",
  273. //"url": "http://localhost/test", //set dynamically to the test server
  274. "bodyType": "form",
  275. "dataTemplate": `{"content":{{json .}}}`,
  276. },
  277. data: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
  278. result: []request{{
  279. Method: "POST",
  280. ContentType: "application/x-www-form-urlencoded;param=value",
  281. Body: `content=%5B%7B%22ab%22%3A%22hello1%22%7D%2C%7B%22ab%22%3A%22hello2%22%7D%5D`,
  282. }},
  283. }, {
  284. config: map[string]interface{}{
  285. "method": "post",
  286. //"url": "http://localhost/test", //set dynamically to the test server
  287. "bodyType": "form",
  288. "sendSingle": true,
  289. "dataTemplate": `{"newab":"{{.ab}}"}`,
  290. },
  291. data: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  292. result: []request{{
  293. Method: "POST",
  294. ContentType: "application/x-www-form-urlencoded;param=value",
  295. Body: `newab=hello1`,
  296. }, {
  297. Method: "POST",
  298. ContentType: "application/x-www-form-urlencoded;param=value",
  299. Body: `newab=hello2`,
  300. }},
  301. }, {
  302. config: map[string]interface{}{
  303. "method": "post",
  304. //"url": "http://localhost/test", //set dynamically to the test server
  305. "bodyType": "json",
  306. "sendSingle": true,
  307. "timeout": float64(1000),
  308. "dataTemplate": `{"newab":"{{.ab}}"}`,
  309. },
  310. data: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  311. result: []request{{
  312. Method: "POST",
  313. Body: `{"newab":"hello1"}`,
  314. ContentType: "application/json",
  315. }, {
  316. Method: "POST",
  317. Body: `{"newab":"hello2"}`,
  318. ContentType: "application/json",
  319. }},
  320. },
  321. }
  322. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  323. contextLogger := conf.Log.WithField("rule", "TestRestSink_Apply")
  324. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  325. var requests []request
  326. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  327. body, err := io.ReadAll(r.Body)
  328. if err != nil {
  329. fmt.Printf("Error reading body: %v", err)
  330. http.Error(w, "can't read body", http.StatusBadRequest)
  331. return
  332. }
  333. requests = append(requests, request{
  334. Method: r.Method,
  335. Body: string(body),
  336. ContentType: r.Header.Get("Content-Type"),
  337. })
  338. contextLogger.Debugf(string(body))
  339. fmt.Fprint(w, string(body))
  340. }))
  341. defer ts.Close()
  342. for i, tt := range tests {
  343. requests = nil
  344. s := &RestSink{}
  345. tt.config["url"] = ts.URL
  346. s.Configure(tt.config)
  347. s.Open(ctx)
  348. vCtx := context.WithValue(ctx, context.TransKey, transform.TransFunc(func(d interface{}) ([]byte, bool, error) {
  349. return d.([]byte), true, nil
  350. }))
  351. for _, d := range tt.data {
  352. s.Collect(vCtx, d)
  353. }
  354. s.Close(ctx)
  355. if !reflect.DeepEqual(tt.result, requests) {
  356. t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, requests)
  357. }
  358. }
  359. }
  360. func TestRestSinkErrorLog(t *testing.T) {
  361. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  362. result := `{"data":[],"extra":"Success","returncode":1,"returnmessage":""}`
  363. time.Sleep(30 * time.Millisecond)
  364. w.WriteHeader(http.StatusNotFound)
  365. w.Write([]byte(result))
  366. }))
  367. defer ts.Close()
  368. t.Run("Test rest sink timeout and return correct error info", func(t *testing.T) {
  369. s := &RestSink{}
  370. config := map[string]interface{}{
  371. "url": ts.URL,
  372. "timeout": float64(10),
  373. }
  374. s.Configure(config)
  375. s.Open(context.Background())
  376. tf, _ := transform.GenTransform("", "json", "", "", "", []string{})
  377. vCtx := context.WithValue(context.Background(), context.TransKey, tf)
  378. reqBody := []map[string]interface{}{
  379. {"ab": "hello1"},
  380. {"ab": "hello2"},
  381. }
  382. err := s.Collect(vCtx, reqBody)
  383. if strings.HasPrefix(err.Error(), errorx.IOErr) && !strings.Contains(err.Error(), "hello1") {
  384. t.Errorf("should include request body, but got %s", err.Error())
  385. }
  386. fmt.Println(err.Error())
  387. s.Close(context.Background())
  388. })
  389. t.Run("Test error info", func(t *testing.T) {
  390. s := &RestSink{}
  391. config := map[string]interface{}{
  392. "url": ts.URL,
  393. "method": "put",
  394. "bodyType": "text",
  395. "responseType": "body",
  396. "timeout": float64(1000),
  397. }
  398. s.Configure(config)
  399. s.Open(context.Background())
  400. tf, _ := transform.GenTransform("", "json", "", "", "", []string{})
  401. vCtx := context.WithValue(context.Background(), context.TransKey, tf)
  402. err := s.Collect(vCtx, []map[string]interface{}{
  403. {"ab": "hello1"},
  404. {"ab": "hello2"},
  405. })
  406. fmt.Println(err.Error())
  407. if strings.HasPrefix(err.Error(), errorx.IOErr) && !strings.Contains(err.Error(), "404") {
  408. t.Errorf("should start with io error, but got %s", err.Error())
  409. }
  410. s.Close(context.Background())
  411. })
  412. }