rest_sink_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package sink
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo/context"
  20. "io/ioutil"
  21. "net/http"
  22. "net/http/httptest"
  23. "reflect"
  24. "testing"
  25. )
  26. type request struct {
  27. Method string
  28. Body string
  29. ContentType string
  30. }
  31. func TestRestSink_Apply(t *testing.T) {
  32. var tests = []struct {
  33. config map[string]interface{}
  34. data []map[string]interface{}
  35. result []request
  36. }{
  37. {
  38. config: map[string]interface{}{
  39. "method": "post",
  40. //"url": "http://localhost/test", //set dynamically to the test server
  41. "sendSingle": true,
  42. },
  43. data: []map[string]interface{}{{
  44. "ab": "hello1",
  45. }, {
  46. "ab": "hello2",
  47. }},
  48. result: []request{{
  49. Method: "POST",
  50. Body: `{"ab":"hello1"}`,
  51. ContentType: "application/json",
  52. }, {
  53. Method: "POST",
  54. Body: `{"ab":"hello2"}`,
  55. ContentType: "application/json",
  56. }},
  57. }, {
  58. config: map[string]interface{}{
  59. "method": "post",
  60. //"url": "http://localhost/test", //set dynamically to the test server
  61. },
  62. data: []map[string]interface{}{{
  63. "ab": "hello1",
  64. }, {
  65. "ab": "hello2",
  66. }},
  67. result: []request{{
  68. Method: "POST",
  69. Body: `[{"ab":"hello1"},{"ab":"hello2"}]`,
  70. ContentType: "application/json",
  71. }},
  72. }, {
  73. config: map[string]interface{}{
  74. "method": "get",
  75. //"url": "http://localhost/test", //set dynamically to the test server
  76. },
  77. data: []map[string]interface{}{{
  78. "ab": "hello1",
  79. }, {
  80. "ab": "hello2",
  81. }},
  82. result: []request{{
  83. Method: "GET",
  84. ContentType: "",
  85. }},
  86. }, {
  87. config: map[string]interface{}{
  88. "method": "put",
  89. //"url": "http://localhost/test", //set dynamically to the test server
  90. "bodyType": "text",
  91. },
  92. data: []map[string]interface{}{{
  93. "ab": "hello1",
  94. }, {
  95. "ab": "hello2",
  96. }},
  97. result: []request{{
  98. Method: "PUT",
  99. ContentType: "text/plain",
  100. Body: `[{"ab":"hello1"},{"ab":"hello2"}]`,
  101. }},
  102. }, {
  103. config: map[string]interface{}{
  104. "method": "post",
  105. //"url": "http://localhost/test", //set dynamically to the test server
  106. "bodyType": "form",
  107. },
  108. data: []map[string]interface{}{{
  109. "ab": "hello1",
  110. }, {
  111. "ab": "hello2",
  112. }},
  113. result: []request{{
  114. Method: "POST",
  115. ContentType: "application/x-www-form-urlencoded;param=value",
  116. Body: `result=%5B%7B%22ab%22%3A%22hello1%22%7D%2C%7B%22ab%22%3A%22hello2%22%7D%5D`,
  117. }},
  118. }, {
  119. config: map[string]interface{}{
  120. "method": "post",
  121. //"url": "http://localhost/test", //set dynamically to the test server
  122. "bodyType": "form",
  123. "sendSingle": true,
  124. },
  125. data: []map[string]interface{}{{
  126. "ab": "hello1",
  127. }, {
  128. "ab": "hello2",
  129. }},
  130. result: []request{{
  131. Method: "POST",
  132. ContentType: "application/x-www-form-urlencoded;param=value",
  133. Body: `ab=hello1`,
  134. }, {
  135. Method: "POST",
  136. ContentType: "application/x-www-form-urlencoded;param=value",
  137. Body: `ab=hello2`,
  138. }},
  139. }, {
  140. config: map[string]interface{}{
  141. "method": "post",
  142. //"url": "http://localhost/test", //set dynamically to the test server
  143. "bodyType": "json",
  144. "sendSingle": true,
  145. "timeout": float64(1000),
  146. },
  147. data: []map[string]interface{}{{
  148. "ab": "hello1",
  149. }, {
  150. "ab": "hello2",
  151. }},
  152. result: []request{{
  153. Method: "POST",
  154. Body: `{"ab":"hello1"}`,
  155. ContentType: "application/json",
  156. }, {
  157. Method: "POST",
  158. Body: `{"ab":"hello2"}`,
  159. ContentType: "application/json",
  160. }},
  161. },
  162. }
  163. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  164. contextLogger := conf.Log.WithField("rule", "TestRestSink_Apply")
  165. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  166. var requests []request
  167. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  168. body, err := ioutil.ReadAll(r.Body)
  169. if err != nil {
  170. fmt.Printf("Error reading body: %v", err)
  171. http.Error(w, "can't read body", http.StatusBadRequest)
  172. return
  173. }
  174. requests = append(requests, request{
  175. Method: r.Method,
  176. Body: string(body),
  177. ContentType: r.Header.Get("Content-Type"),
  178. })
  179. contextLogger.Debugf(string(body))
  180. fmt.Fprintf(w, string(body))
  181. }))
  182. defer ts.Close()
  183. for i, tt := range tests {
  184. requests = nil
  185. ss, ok := tt.config["sendSingle"]
  186. if !ok {
  187. ss = false
  188. }
  189. s := &RestSink{}
  190. tt.config["url"] = ts.URL
  191. s.Configure(tt.config)
  192. s.Open(ctx)
  193. if ss.(bool) {
  194. for _, d := range tt.data {
  195. input, err := json.Marshal(d)
  196. if err != nil {
  197. t.Errorf("Failed to parse the input into []byte]")
  198. continue
  199. }
  200. s.Collect(ctx, input)
  201. }
  202. } else {
  203. input, err := json.Marshal(tt.data)
  204. if err != nil {
  205. t.Errorf("Failed to parse the input into []byte]")
  206. continue
  207. }
  208. s.Collect(ctx, input)
  209. }
  210. s.Close(ctx)
  211. if !reflect.DeepEqual(tt.result, requests) {
  212. t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, requests)
  213. }
  214. }
  215. }
  216. func TestRestSinkTemplate_Apply(t *testing.T) {
  217. var tests = []struct {
  218. config map[string]interface{}
  219. data [][]byte
  220. result []request
  221. }{
  222. {
  223. config: map[string]interface{}{
  224. "method": "post",
  225. //"url": "http://localhost/test", //set dynamically to the test server
  226. "sendSingle": true,
  227. "dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
  228. },
  229. data: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
  230. result: []request{{
  231. Method: "POST",
  232. Body: `{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`,
  233. ContentType: "application/json",
  234. }, {
  235. Method: "POST",
  236. Body: `{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`,
  237. ContentType: "application/json",
  238. }},
  239. }, {
  240. config: map[string]interface{}{
  241. "method": "post",
  242. //"url": "http://localhost/test", //set dynamically to the test server
  243. "dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
  244. },
  245. data: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
  246. result: []request{{
  247. Method: "POST",
  248. Body: `{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`,
  249. ContentType: "application/json",
  250. }},
  251. }, {
  252. config: map[string]interface{}{
  253. "method": "get",
  254. //"url": "http://localhost/test", //set dynamically to the test server
  255. "dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
  256. },
  257. data: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`)},
  258. result: []request{{
  259. Method: "GET",
  260. ContentType: "",
  261. }},
  262. }, {
  263. config: map[string]interface{}{
  264. "method": "put",
  265. //"url": "http://localhost/test", //set dynamically to the test server
  266. "bodyType": "html",
  267. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
  268. },
  269. data: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
  270. result: []request{{
  271. Method: "PUT",
  272. ContentType: "text/html",
  273. Body: `<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`,
  274. }},
  275. }, {
  276. config: map[string]interface{}{
  277. "method": "post",
  278. //"url": "http://localhost/test", //set dynamically to the test server
  279. "bodyType": "form",
  280. "dataTemplate": `{"content":{{json .}}}`,
  281. },
  282. data: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
  283. result: []request{{
  284. Method: "POST",
  285. ContentType: "application/x-www-form-urlencoded;param=value",
  286. Body: `content=%5B%7B%22ab%22%3A%22hello1%22%7D%2C%7B%22ab%22%3A%22hello2%22%7D%5D`,
  287. }},
  288. }, {
  289. config: map[string]interface{}{
  290. "method": "post",
  291. //"url": "http://localhost/test", //set dynamically to the test server
  292. "bodyType": "form",
  293. "sendSingle": true,
  294. "dataTemplate": `{"newab":"{{.ab}}"}`,
  295. },
  296. data: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  297. result: []request{{
  298. Method: "POST",
  299. ContentType: "application/x-www-form-urlencoded;param=value",
  300. Body: `newab=hello1`,
  301. }, {
  302. Method: "POST",
  303. ContentType: "application/x-www-form-urlencoded;param=value",
  304. Body: `newab=hello2`,
  305. }},
  306. }, {
  307. config: map[string]interface{}{
  308. "method": "post",
  309. //"url": "http://localhost/test", //set dynamically to the test server
  310. "bodyType": "json",
  311. "sendSingle": true,
  312. "timeout": float64(1000),
  313. "dataTemplate": `{"newab":"{{.ab}}"}`,
  314. },
  315. data: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
  316. result: []request{{
  317. Method: "POST",
  318. Body: `{"newab":"hello1"}`,
  319. ContentType: "application/json",
  320. }, {
  321. Method: "POST",
  322. Body: `{"newab":"hello2"}`,
  323. ContentType: "application/json",
  324. }},
  325. },
  326. }
  327. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  328. contextLogger := conf.Log.WithField("rule", "TestRestSink_Apply")
  329. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  330. var requests []request
  331. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  332. body, err := ioutil.ReadAll(r.Body)
  333. if err != nil {
  334. fmt.Printf("Error reading body: %v", err)
  335. http.Error(w, "can't read body", http.StatusBadRequest)
  336. return
  337. }
  338. requests = append(requests, request{
  339. Method: r.Method,
  340. Body: string(body),
  341. ContentType: r.Header.Get("Content-Type"),
  342. })
  343. contextLogger.Debugf(string(body))
  344. fmt.Fprintf(w, string(body))
  345. }))
  346. defer ts.Close()
  347. for i, tt := range tests {
  348. requests = nil
  349. s := &RestSink{}
  350. tt.config["url"] = ts.URL
  351. s.Configure(tt.config)
  352. s.Open(ctx)
  353. for _, d := range tt.data {
  354. s.Collect(ctx, d)
  355. }
  356. s.Close(ctx)
  357. if !reflect.DeepEqual(tt.result, requests) {
  358. t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, requests)
  359. }
  360. }
  361. }