rest_sink_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package sink
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/conf"
  18. "github.com/lf-edge/ekuiper/internal/topo/context"
  19. "github.com/lf-edge/ekuiper/internal/topo/transform"
  20. "io/ioutil"
  21. "net/http"
  22. "net/http/httptest"
  23. "reflect"
  24. "testing"
  25. )
  26. type request struct {
  27. Method string
  28. Body string
  29. ContentType string
  30. }
  31. func TestRestSink_Apply(t *testing.T) {
  32. var tests = []struct {
  33. config map[string]interface{}
  34. data []map[string]interface{}
  35. result []request
  36. }{
  37. {
  38. config: map[string]interface{}{
  39. "method": "post",
  40. //"url": "http://localhost/test", //set dynamically to the test server
  41. "sendSingle": true,
  42. },
  43. data: []map[string]interface{}{{
  44. "ab": "hello1",
  45. }, {
  46. "ab": "hello2",
  47. }},
  48. result: []request{{
  49. Method: "POST",
  50. Body: `{"ab":"hello1"}`,
  51. ContentType: "application/json",
  52. }, {
  53. Method: "POST",
  54. Body: `{"ab":"hello2"}`,
  55. ContentType: "application/json",
  56. }},
  57. }, {
  58. config: map[string]interface{}{
  59. "method": "post",
  60. //"url": "http://localhost/test", //set dynamically to the test server
  61. },
  62. data: []map[string]interface{}{{
  63. "ab": "hello1",
  64. }, {
  65. "ab": "hello2",
  66. }},
  67. result: []request{{
  68. Method: "POST",
  69. Body: `[{"ab":"hello1"},{"ab":"hello2"}]`,
  70. ContentType: "application/json",
  71. }},
  72. }, {
  73. config: map[string]interface{}{
  74. "method": "get",
  75. //"url": "http://localhost/test", //set dynamically to the test server
  76. },
  77. data: []map[string]interface{}{{
  78. "ab": "hello1",
  79. }, {
  80. "ab": "hello2",
  81. }},
  82. result: []request{{
  83. Method: "GET",
  84. ContentType: "",
  85. }},
  86. }, {
  87. config: map[string]interface{}{
  88. "method": "put",
  89. //"url": "http://localhost/test", //set dynamically to the test server
  90. "bodyType": "text",
  91. },
  92. data: []map[string]interface{}{{
  93. "ab": "hello1",
  94. }, {
  95. "ab": "hello2",
  96. }},
  97. result: []request{{
  98. Method: "PUT",
  99. ContentType: "text/plain",
  100. Body: `[{"ab":"hello1"},{"ab":"hello2"}]`,
  101. }},
  102. }, {
  103. config: map[string]interface{}{
  104. "method": "post",
  105. //"url": "http://localhost/test", //set dynamically to the test server
  106. "bodyType": "form",
  107. },
  108. data: []map[string]interface{}{{
  109. "ab": "hello1",
  110. }, {
  111. "ab": "hello2",
  112. }},
  113. result: []request{{
  114. Method: "POST",
  115. ContentType: "application/x-www-form-urlencoded;param=value",
  116. Body: `result=%5B%7B%22ab%22%3A%22hello1%22%7D%2C%7B%22ab%22%3A%22hello2%22%7D%5D`,
  117. }},
  118. }, {
  119. config: map[string]interface{}{
  120. "method": "post",
  121. //"url": "http://localhost/test", //set dynamically to the test server
  122. "bodyType": "form",
  123. "sendSingle": true,
  124. },
  125. data: []map[string]interface{}{{
  126. "ab": "hello1",
  127. }, {
  128. "ab": "hello2",
  129. }},
  130. result: []request{{
  131. Method: "POST",
  132. ContentType: "application/x-www-form-urlencoded;param=value",
  133. Body: `ab=hello1`,
  134. }, {
  135. Method: "POST",
  136. ContentType: "application/x-www-form-urlencoded;param=value",
  137. Body: `ab=hello2`,
  138. }},
  139. }, {
  140. config: map[string]interface{}{
  141. "method": "post",
  142. //"url": "http://localhost/test", //set dynamically to the test server
  143. "bodyType": "json",
  144. "sendSingle": true,
  145. "timeout": float64(1000),
  146. },
  147. data: []map[string]interface{}{{
  148. "ab": "hello1",
  149. }, {
  150. "ab": "hello2",
  151. }},
  152. result: []request{{
  153. Method: "POST",
  154. Body: `{"ab":"hello1"}`,
  155. ContentType: "application/json",
  156. }, {
  157. Method: "POST",
  158. Body: `{"ab":"hello2"}`,
  159. ContentType: "application/json",
  160. }},
  161. },
  162. }
  163. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  164. contextLogger := conf.Log.WithField("rule", "TestRestSink_Apply")
  165. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  166. var requests []request
  167. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  168. body, err := ioutil.ReadAll(r.Body)
  169. if err != nil {
  170. fmt.Printf("Error reading body: %v", err)
  171. http.Error(w, "can't read body", http.StatusBadRequest)
  172. return
  173. }
  174. requests = append(requests, request{
  175. Method: r.Method,
  176. Body: string(body),
  177. ContentType: r.Header.Get("Content-Type"),
  178. })
  179. contextLogger.Debugf(string(body))
  180. fmt.Fprintf(w, string(body))
  181. }))
  182. tf, _ := transform.GenTransform("")
  183. defer ts.Close()
  184. for i, tt := range tests {
  185. requests = nil
  186. ss, ok := tt.config["sendSingle"]
  187. if !ok {
  188. ss = false
  189. }
  190. s := &RestSink{}
  191. tt.config["url"] = ts.URL
  192. s.Configure(tt.config)
  193. s.Open(ctx)
  194. vCtx := context.WithValue(ctx, context.TransKey, tf)
  195. if ss.(bool) {
  196. for _, d := range tt.data {
  197. s.Collect(vCtx, d)
  198. }
  199. } else {
  200. s.Collect(vCtx, tt.data)
  201. }
  202. s.Close(ctx)
  203. if !reflect.DeepEqual(tt.result, requests) {
  204. t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, requests)
  205. }
  206. }
  207. }
  208. func TestRestSinkTemplate_Apply(t *testing.T) {
  209. var tests = []struct {
  210. config map[string]interface{}
  211. data [][]byte
  212. result []request
  213. }{
  214. {
  215. config: map[string]interface{}{
  216. "method": "post",
  217. //"url": "http://localhost/test", //set dynamically to the test server
  218. "sendSingle": true,
  219. "dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
  220. },
  221. data: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
  222. result: []request{{
  223. Method: "POST",
  224. Body: `{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`,
  225. ContentType: "application/json",
  226. }, {
  227. Method: "POST",
  228. Body: `{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`,
  229. ContentType: "application/json",
  230. }},
  231. }, {
  232. config: map[string]interface{}{
  233. "method": "post",
  234. //"url": "http://localhost/test", //set dynamically to the test server
  235. "dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
  236. },
  237. data: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
  238. result: []request{{
  239. Method: "POST",
  240. Body: `{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`,
  241. ContentType: "application/json",
  242. }},
  243. }, {
  244. config: map[string]interface{}{
  245. "method": "get",
  246. //"url": "http://localhost/test", //set dynamically to the test server
  247. "dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
  248. },
  249. data: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`)},
  250. result: []request{{
  251. Method: "GET",
  252. ContentType: "",
  253. }},
  254. }, {
  255. config: map[string]interface{}{
  256. "method": "put",
  257. //"url": "http://localhost/test", //set dynamically to the test server
  258. "bodyType": "html",
  259. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
  260. },
  261. data: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
  262. result: []request{{
  263. Method: "PUT",
  264. ContentType: "text/html",
  265. Body: `<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`,
  266. }},
  267. }, {
  268. config: map[string]interface{}{
  269. "method": "post",
  270. //"url": "http://localhost/test", //set dynamically to the test server
  271. "bodyType": "form",
  272. "dataTemplate": `{"content":{{json .}}}`,
  273. },
  274. data: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
  275. result: []request{{
  276. Method: "POST",
  277. ContentType: "application/x-www-form-urlencoded;param=value",
  278. Body: `content=%5B%7B%22ab%22%3A%22hello1%22%7D%2C%7B%22ab%22%3A%22hello2%22%7D%5D`,
  279. }},
  280. }, {
  281. config: map[string]interface{}{
  282. "method": "post",
  283. //"url": "http://localhost/test", //set dynamically to the test server
  284. "bodyType": "form",
  285. "sendSingle": true,
  286. "dataTemplate": `{"newab":"{{.ab}}"}`,
  287. },
  288. data: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  289. result: []request{{
  290. Method: "POST",
  291. ContentType: "application/x-www-form-urlencoded;param=value",
  292. Body: `newab=hello1`,
  293. }, {
  294. Method: "POST",
  295. ContentType: "application/x-www-form-urlencoded;param=value",
  296. Body: `newab=hello2`,
  297. }},
  298. }, {
  299. config: map[string]interface{}{
  300. "method": "post",
  301. //"url": "http://localhost/test", //set dynamically to the test server
  302. "bodyType": "json",
  303. "sendSingle": true,
  304. "timeout": float64(1000),
  305. "dataTemplate": `{"newab":"{{.ab}}"}`,
  306. },
  307. data: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  308. result: []request{{
  309. Method: "POST",
  310. Body: `{"newab":"hello1"}`,
  311. ContentType: "application/json",
  312. }, {
  313. Method: "POST",
  314. Body: `{"newab":"hello2"}`,
  315. ContentType: "application/json",
  316. }},
  317. },
  318. }
  319. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  320. contextLogger := conf.Log.WithField("rule", "TestRestSink_Apply")
  321. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  322. var requests []request
  323. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  324. body, err := ioutil.ReadAll(r.Body)
  325. if err != nil {
  326. fmt.Printf("Error reading body: %v", err)
  327. http.Error(w, "can't read body", http.StatusBadRequest)
  328. return
  329. }
  330. requests = append(requests, request{
  331. Method: r.Method,
  332. Body: string(body),
  333. ContentType: r.Header.Get("Content-Type"),
  334. })
  335. contextLogger.Debugf(string(body))
  336. fmt.Fprintf(w, string(body))
  337. }))
  338. defer ts.Close()
  339. for i, tt := range tests {
  340. requests = nil
  341. s := &RestSink{}
  342. tt.config["url"] = ts.URL
  343. s.Configure(tt.config)
  344. s.Open(ctx)
  345. vCtx := context.WithValue(ctx, context.TransKey, transform.TransFunc(func(d interface{}) ([]byte, bool, error) {
  346. return d.([]byte), true, nil
  347. }))
  348. for _, d := range tt.data {
  349. s.Collect(vCtx, d)
  350. }
  351. s.Close(ctx)
  352. if !reflect.DeepEqual(tt.result, requests) {
  353. t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, requests)
  354. }
  355. }
  356. }