httppull_source_test.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package http
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "io"
  20. "net"
  21. "net/http"
  22. "net/http/httptest"
  23. "path/filepath"
  24. "reflect"
  25. "strconv"
  26. "testing"
  27. "time"
  28. "github.com/gorilla/mux"
  29. "github.com/stretchr/testify/assert"
  30. "github.com/lf-edge/ekuiper/internal/conf"
  31. "github.com/lf-edge/ekuiper/internal/io/mock"
  32. "github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
  33. "github.com/lf-edge/ekuiper/internal/xsql"
  34. "github.com/lf-edge/ekuiper/pkg/api"
  35. )
  36. func jsonOut(w http.ResponseWriter, out interface{}) {
  37. w.Header().Add("Content-Type", "application/json")
  38. enc := json.NewEncoder(w)
  39. err := enc.Encode(out)
  40. // Problems encoding
  41. if err != nil {
  42. http.Error(w, err.Error(), http.StatusBadRequest)
  43. }
  44. }
  45. const (
  46. DefaultToken = "privatisation"
  47. RefreshToken = "privaterefresh"
  48. )
  49. // mock http auth server
  50. func mockAuthServer() *httptest.Server {
  51. l, _ := net.Listen("tcp", "127.0.0.1:52345")
  52. router := mux.NewRouter()
  53. i := 0
  54. router.HandleFunc("/token", func(w http.ResponseWriter, r *http.Request) {
  55. body := &struct {
  56. Username string `json:"username"`
  57. Password string `json:"password"`
  58. }{}
  59. err := json.NewDecoder(r.Body).Decode(body)
  60. if err != nil {
  61. http.Error(w, err.Error(), http.StatusBadRequest)
  62. }
  63. if body.Username != "admin" || body.Password != "0000" {
  64. http.Error(w, "invalid username or password", http.StatusBadRequest)
  65. }
  66. out := &struct {
  67. Token string `json:"token"`
  68. RefreshToken string `json:"refresh_token"`
  69. ClientId string `json:"client_id"`
  70. Expires int64 `json:"expires"`
  71. }{
  72. Token: DefaultToken,
  73. RefreshToken: RefreshToken,
  74. ClientId: "test",
  75. Expires: 36000,
  76. }
  77. jsonOut(w, out)
  78. }).Methods(http.MethodPost)
  79. router.HandleFunc("/refresh", func(w http.ResponseWriter, r *http.Request) {
  80. token := r.Header.Get("Authorization")
  81. if token != "Bearer "+DefaultToken {
  82. http.Error(w, "invalid token", http.StatusBadRequest)
  83. }
  84. rt := r.Header.Get("RefreshToken")
  85. if rt != RefreshToken {
  86. http.Error(w, "invalid refresh token", http.StatusBadRequest)
  87. }
  88. out := &struct {
  89. Token string `json:"token"`
  90. RefreshToken string `json:"refresh_token"`
  91. ClientId string `json:"client_id"`
  92. Expires int64 `json:"expires"`
  93. }{
  94. Token: DefaultToken,
  95. RefreshToken: RefreshToken,
  96. ClientId: "test",
  97. Expires: 36000,
  98. }
  99. jsonOut(w, out)
  100. }).Methods(http.MethodPost)
  101. router.HandleFunc("/data", func(w http.ResponseWriter, r *http.Request) {
  102. token := r.Header.Get("Authorization")
  103. if token != "Bearer "+DefaultToken {
  104. http.Error(w, "invalid token", http.StatusBadRequest)
  105. }
  106. out := &struct {
  107. DeviceId string `json:"device_id"`
  108. Temperature float64 `json:"temperature"`
  109. Humidity float64 `json:"humidity"`
  110. }{
  111. DeviceId: "device1",
  112. Temperature: 25.5,
  113. Humidity: 60.0,
  114. }
  115. jsonOut(w, out)
  116. }).Methods(http.MethodGet)
  117. // Return same data for 3 times
  118. router.HandleFunc("/data2", func(w http.ResponseWriter, r *http.Request) {
  119. out := &struct {
  120. Code int `json:"code"`
  121. Data struct {
  122. DeviceId string `json:"device_id"`
  123. Temperature float64 `json:"temperature"`
  124. Humidity float64 `json:"humidity"`
  125. } `json:"data"`
  126. }{
  127. Code: 200,
  128. Data: struct {
  129. DeviceId string `json:"device_id"`
  130. Temperature float64 `json:"temperature"`
  131. Humidity float64 `json:"humidity"`
  132. }{
  133. DeviceId: "device" + strconv.Itoa(i/3),
  134. Temperature: 25.5,
  135. Humidity: 60.0,
  136. },
  137. }
  138. i++
  139. jsonOut(w, out)
  140. }).Methods(http.MethodGet)
  141. router.HandleFunc("/data3", func(w http.ResponseWriter, r *http.Request) {
  142. out := []*struct {
  143. Code int `json:"code"`
  144. Data struct {
  145. DeviceId string `json:"device_id"`
  146. Temperature float64 `json:"temperature"`
  147. Humidity float64 `json:"humidity"`
  148. } `json:"data"`
  149. }{
  150. {
  151. Code: 200,
  152. Data: struct {
  153. DeviceId string `json:"device_id"`
  154. Temperature float64 `json:"temperature"`
  155. Humidity float64 `json:"humidity"`
  156. }{
  157. DeviceId: "d1",
  158. Temperature: 25.5,
  159. Humidity: 60.0,
  160. },
  161. },
  162. {
  163. Code: 200,
  164. Data: struct {
  165. DeviceId string `json:"device_id"`
  166. Temperature float64 `json:"temperature"`
  167. Humidity float64 `json:"humidity"`
  168. }{
  169. DeviceId: "d2",
  170. Temperature: 25.5,
  171. Humidity: 60.0,
  172. },
  173. },
  174. }
  175. jsonOut(w, out)
  176. }).Methods(http.MethodGet)
  177. // data4 receives time range in url
  178. router.HandleFunc("/data4", func(w http.ResponseWriter, r *http.Request) {
  179. device := r.URL.Query().Get("device")
  180. s := r.URL.Query().Get("start")
  181. e := r.URL.Query().Get("end")
  182. start, _ := strconv.ParseInt(s, 10, 64)
  183. end, _ := strconv.ParseInt(e, 10, 64)
  184. out := &struct {
  185. Code int `json:"code"`
  186. Data struct {
  187. DeviceId string `json:"device_id"`
  188. Temperature int64 `json:"temperature"`
  189. Humidity int64 `json:"humidity"`
  190. } `json:"data"`
  191. }{
  192. Code: 200,
  193. Data: struct {
  194. DeviceId string `json:"device_id"`
  195. Temperature int64 `json:"temperature"`
  196. Humidity int64 `json:"humidity"`
  197. }{
  198. DeviceId: device,
  199. Temperature: start % 50,
  200. Humidity: end % 100,
  201. },
  202. }
  203. jsonOut(w, out)
  204. }).Methods(http.MethodGet)
  205. // data5 receives time range in body
  206. router.HandleFunc("/data5", func(w http.ResponseWriter, r *http.Request) {
  207. body, err := io.ReadAll(r.Body)
  208. if err != nil {
  209. http.Error(w, "Failed to read request body", http.StatusBadRequest)
  210. return
  211. }
  212. // Create a Person struct to hold the JSON data
  213. var ddd struct {
  214. Device string `json:"device"`
  215. Start int64 `json:"start"`
  216. End int64 `json:"end"`
  217. }
  218. // Unmarshal the JSON data into the Person struct
  219. err = json.Unmarshal(body, &ddd)
  220. if err != nil {
  221. http.Error(w, "Failed to parse JSON", http.StatusBadRequest)
  222. return
  223. }
  224. out := &struct {
  225. Code int `json:"code"`
  226. Data struct {
  227. DeviceId string `json:"device_id"`
  228. Temperature int64 `json:"temperature"`
  229. Humidity int64 `json:"humidity"`
  230. } `json:"data"`
  231. }{
  232. Code: 200,
  233. Data: struct {
  234. DeviceId string `json:"device_id"`
  235. Temperature int64 `json:"temperature"`
  236. Humidity int64 `json:"humidity"`
  237. }{
  238. DeviceId: ddd.Device,
  239. Temperature: ddd.Start % 50,
  240. Humidity: ddd.End % 100,
  241. },
  242. }
  243. jsonOut(w, out)
  244. }).Methods(http.MethodPost)
  245. server := httptest.NewUnstartedServer(router)
  246. err := server.Listener.Close()
  247. if err != nil {
  248. panic(err)
  249. }
  250. server.Listener = l
  251. return server
  252. }
  253. var wrongPath, _ = filepath.Abs("/tmp/wrong")
  254. // Test configure to properties
  255. func TestConfigure(t *testing.T) {
  256. tests := []struct {
  257. name string
  258. props map[string]interface{}
  259. err error
  260. config *RawConf
  261. accessConf *AccessTokenConf
  262. refreshConf *RefreshTokenConf
  263. tokens map[string]interface{}
  264. }{
  265. {
  266. name: "default",
  267. props: map[string]interface{}{
  268. "incremental": true,
  269. "url": "http://localhost:9090/",
  270. },
  271. config: &RawConf{
  272. Incremental: true,
  273. Url: "http://localhost:9090/",
  274. ResendUrl: "http://localhost:9090/",
  275. Method: http.MethodGet,
  276. Interval: DefaultInterval,
  277. Timeout: DefaultTimeout,
  278. BodyType: "none",
  279. ResponseType: "code",
  280. InsecureSkipVerify: true,
  281. },
  282. },
  283. // Test wrong properties
  284. {
  285. name: "wrong props",
  286. props: map[string]interface{}{
  287. "incremental": true,
  288. "url": 123,
  289. },
  290. err: fmt.Errorf("fail to parse the properties: 1 error(s) decoding:\n\n* 'url' expected type 'string', got unconvertible type 'int', value: '123'"),
  291. },
  292. {
  293. name: "empty url",
  294. props: map[string]interface{}{
  295. "incremental": true,
  296. "url": "",
  297. },
  298. err: fmt.Errorf("url is required"),
  299. },
  300. {
  301. name: "wrong method",
  302. props: map[string]interface{}{
  303. "url": "http://localhost:9090/",
  304. "method": "wrong",
  305. },
  306. err: fmt.Errorf("Not supported HTTP method wrong."),
  307. },
  308. {
  309. name: "wrong bodytype",
  310. props: map[string]interface{}{
  311. "url": "http://localhost:9090/",
  312. "bodyType": "wrong",
  313. },
  314. err: fmt.Errorf("Not valid body type value wrong."),
  315. },
  316. {
  317. name: "wrong response type",
  318. props: map[string]interface{}{
  319. "url": "http://localhost:9090/",
  320. "responseType": "wrong",
  321. },
  322. err: fmt.Errorf("Not valid response type value wrong."),
  323. },
  324. {
  325. name: "wrong url",
  326. props: map[string]interface{}{
  327. "url": "http:/localhost:9090/",
  328. },
  329. err: fmt.Errorf("Invalid url, host not found"),
  330. },
  331. {
  332. name: "wrong interval",
  333. props: map[string]interface{}{
  334. "url": "http:/localhost:9090/",
  335. "interval": -2,
  336. },
  337. err: fmt.Errorf("interval must be greater than 0"),
  338. },
  339. {
  340. name: "wrong timeout",
  341. props: map[string]interface{}{
  342. "url": "http:/localhost:9090/",
  343. "timeout": -2,
  344. },
  345. err: fmt.Errorf("timeout must be greater than or equal to 0"),
  346. },
  347. {
  348. name: "wrong tls",
  349. props: map[string]interface{}{
  350. "url": "http://localhost:9090/",
  351. "certificationPath": wrongPath,
  352. },
  353. err: fmt.Errorf(fmt.Sprintf("stat %s: no such file or directory", wrongPath)),
  354. },
  355. // Test oAuth
  356. {
  357. name: "oAuth with access token and constant expire",
  358. props: map[string]interface{}{
  359. "url": "http://localhost:52345/",
  360. "headers": map[string]interface{}{
  361. "Authorization": "Bearer {{.token}}",
  362. },
  363. "oAuth": map[string]interface{}{
  364. "access": map[string]interface{}{
  365. "url": "http://localhost:52345/token",
  366. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  367. "expire": "3600",
  368. },
  369. },
  370. },
  371. config: &RawConf{
  372. Url: "http://localhost:52345/",
  373. ResendUrl: "http://localhost:52345/",
  374. Method: http.MethodGet,
  375. Interval: DefaultInterval,
  376. Timeout: DefaultTimeout,
  377. BodyType: "none",
  378. ResponseType: "code",
  379. InsecureSkipVerify: true,
  380. Headers: map[string]interface{}{
  381. "Authorization": "Bearer {{.token}}",
  382. },
  383. HeadersMap: map[string]string{
  384. "Authorization": "Bearer {{.token}}",
  385. },
  386. OAuth: map[string]map[string]interface{}{
  387. "access": {
  388. "url": "http://localhost:52345/token",
  389. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  390. "expire": "3600",
  391. },
  392. },
  393. },
  394. accessConf: &AccessTokenConf{
  395. Url: "http://localhost:52345/token",
  396. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  397. Expire: "3600",
  398. ExpireInSecond: 3600,
  399. },
  400. tokens: map[string]interface{}{
  401. "token": DefaultToken,
  402. "refresh_token": RefreshToken,
  403. "client_id": "test",
  404. "expires": float64(36000),
  405. },
  406. },
  407. {
  408. name: "oAuth with access token and dynamic expire",
  409. props: map[string]interface{}{
  410. "url": "http://localhost:52345/",
  411. "headers": map[string]interface{}{
  412. "Authorization": "Bearer {{.token}}",
  413. },
  414. "oAuth": map[string]interface{}{
  415. "access": map[string]interface{}{
  416. "url": "http://localhost:52345/token",
  417. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  418. "expire": "{{.expires}}",
  419. },
  420. },
  421. },
  422. config: &RawConf{
  423. Url: "http://localhost:52345/",
  424. ResendUrl: "http://localhost:52345/",
  425. Method: http.MethodGet,
  426. Interval: DefaultInterval,
  427. Timeout: DefaultTimeout,
  428. BodyType: "none",
  429. ResponseType: "code",
  430. InsecureSkipVerify: true,
  431. Headers: map[string]interface{}{
  432. "Authorization": "Bearer {{.token}}",
  433. },
  434. HeadersMap: map[string]string{
  435. "Authorization": "Bearer {{.token}}",
  436. },
  437. OAuth: map[string]map[string]interface{}{
  438. "access": {
  439. "url": "http://localhost:52345/token",
  440. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  441. "expire": "{{.expires}}",
  442. },
  443. },
  444. },
  445. accessConf: &AccessTokenConf{
  446. Url: "http://localhost:52345/token",
  447. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  448. Expire: "{{.expires}}",
  449. ExpireInSecond: 36000,
  450. },
  451. tokens: map[string]interface{}{
  452. "token": DefaultToken,
  453. "refresh_token": RefreshToken,
  454. "client_id": "test",
  455. "expires": float64(36000),
  456. },
  457. },
  458. {
  459. name: "oAuth with access token and refresh token",
  460. props: map[string]interface{}{
  461. "url": "http://localhost:52345/",
  462. "headers": map[string]interface{}{
  463. "Authorization": "Bearer {{.token}}",
  464. },
  465. "oAuth": map[string]interface{}{
  466. "access": map[string]interface{}{
  467. "url": "http://localhost:52345/token",
  468. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  469. "expire": "3600",
  470. },
  471. "refresh": map[string]interface{}{
  472. "url": "http://localhost:52345/refresh",
  473. "headers": map[string]interface{}{
  474. "Authorization": "Bearer {{.token}}",
  475. "RefreshToken": "{{.refresh_token}}",
  476. },
  477. },
  478. },
  479. },
  480. config: &RawConf{
  481. Url: "http://localhost:52345/",
  482. ResendUrl: "http://localhost:52345/",
  483. Method: http.MethodGet,
  484. Interval: DefaultInterval,
  485. Timeout: DefaultTimeout,
  486. BodyType: "none",
  487. ResponseType: "code",
  488. InsecureSkipVerify: true,
  489. Headers: map[string]interface{}{
  490. "Authorization": "Bearer {{.token}}",
  491. },
  492. HeadersMap: map[string]string{
  493. "Authorization": "Bearer {{.token}}",
  494. },
  495. OAuth: map[string]map[string]interface{}{
  496. "access": {
  497. "url": "http://localhost:52345/token",
  498. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  499. "expire": "3600",
  500. },
  501. "refresh": {
  502. "url": "http://localhost:52345/refresh",
  503. "headers": map[string]interface{}{
  504. "Authorization": "Bearer {{.token}}",
  505. "RefreshToken": "{{.refresh_token}}",
  506. },
  507. },
  508. },
  509. },
  510. accessConf: &AccessTokenConf{
  511. Url: "http://localhost:52345/token",
  512. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  513. Expire: "3600",
  514. ExpireInSecond: 3600,
  515. },
  516. refreshConf: &RefreshTokenConf{
  517. Url: "http://localhost:52345/refresh",
  518. Headers: map[string]string{
  519. "Authorization": "Bearer {{.token}}",
  520. "RefreshToken": "{{.refresh_token}}",
  521. },
  522. },
  523. tokens: map[string]interface{}{
  524. "token": DefaultToken,
  525. "refresh_token": RefreshToken,
  526. "client_id": "test",
  527. "expires": float64(36000),
  528. },
  529. },
  530. // Wrong auth configs
  531. {
  532. name: "oAuth wrong access token config",
  533. props: map[string]interface{}{
  534. "url": "http://localhost:52345/",
  535. "headers": map[string]interface{}{
  536. "Authorization": "Bearer {{.token}}",
  537. },
  538. "oAuth": map[string]interface{}{
  539. "access": map[string]interface{}{
  540. "url": "http://localhost:52345/token",
  541. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  542. "expire": 3600,
  543. },
  544. },
  545. },
  546. err: errors.New("fail to parse the access properties of oAuth: 1 error(s) decoding:\n\n* 'expire' expected type 'string', got unconvertible type 'int', value: '3600'"),
  547. },
  548. {
  549. name: "oAuth wrong access url",
  550. props: map[string]interface{}{
  551. "url": "http://localhost:52345/",
  552. "headers": map[string]interface{}{
  553. "Authorization": "Bearer {{.token}}",
  554. },
  555. "oAuth": map[string]interface{}{
  556. "access": map[string]interface{}{
  557. "url": "",
  558. },
  559. },
  560. },
  561. config: &RawConf{
  562. Url: "http://localhost:52345/",
  563. ResendUrl: "http://localhost:52345/",
  564. Method: http.MethodGet,
  565. Interval: DefaultInterval,
  566. Timeout: DefaultTimeout,
  567. BodyType: "none",
  568. ResponseType: "code",
  569. InsecureSkipVerify: true,
  570. Headers: map[string]interface{}{
  571. "Authorization": "Bearer {{.token}}",
  572. },
  573. HeadersMap: map[string]string{
  574. "Authorization": "Bearer {{.token}}",
  575. },
  576. },
  577. },
  578. {
  579. name: "oAuth miss access",
  580. props: map[string]interface{}{
  581. "url": "http://localhost:52345/",
  582. "headers": map[string]interface{}{
  583. "Authorization": "Bearer {{.token}}",
  584. },
  585. "oAuth": map[string]interface{}{
  586. "refresh": map[string]interface{}{
  587. "url": "http://localhost:52345/",
  588. },
  589. },
  590. },
  591. err: errors.New("if setting oAuth, `access` property is required"),
  592. },
  593. {
  594. name: "oAuth wrong refresh token config",
  595. props: map[string]interface{}{
  596. "url": "http://localhost:52345/",
  597. "headers": map[string]interface{}{
  598. "Authorization": "Bearer {{.token}}",
  599. },
  600. "oAuth": map[string]interface{}{
  601. "access": map[string]interface{}{
  602. "url": "http://localhost:52345/token",
  603. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  604. "expire": "3600",
  605. },
  606. "refresh": map[string]interface{}{
  607. "url": 1202,
  608. },
  609. },
  610. },
  611. accessConf: &AccessTokenConf{
  612. Url: "http://localhost:52345/token",
  613. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  614. Expire: "3600",
  615. ExpireInSecond: 3600,
  616. },
  617. err: errors.New("fail to parse the refresh token properties: 1 error(s) decoding:\n\n* 'url' expected type 'string', got unconvertible type 'int', value: '1202'"),
  618. },
  619. {
  620. name: "oAuth refresh token missing url",
  621. props: map[string]interface{}{
  622. "url": "http://localhost:52345/",
  623. "headers": map[string]interface{}{
  624. "Authorization": "Bearer {{.token}}",
  625. },
  626. "oAuth": map[string]interface{}{
  627. "access": map[string]interface{}{
  628. "url": "http://localhost:52345/token",
  629. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  630. "expire": "3600",
  631. },
  632. },
  633. },
  634. config: &RawConf{
  635. Url: "http://localhost:52345/",
  636. ResendUrl: "http://localhost:52345/",
  637. Method: http.MethodGet,
  638. Interval: DefaultInterval,
  639. Timeout: DefaultTimeout,
  640. BodyType: "none",
  641. ResponseType: "code",
  642. InsecureSkipVerify: true,
  643. Headers: map[string]interface{}{
  644. "Authorization": "Bearer {{.token}}",
  645. },
  646. HeadersMap: map[string]string{
  647. "Authorization": "Bearer {{.token}}",
  648. },
  649. OAuth: map[string]map[string]interface{}{
  650. "access": {
  651. "url": "http://localhost:52345/token",
  652. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  653. "expire": "3600",
  654. },
  655. },
  656. },
  657. accessConf: &AccessTokenConf{
  658. Url: "http://localhost:52345/token",
  659. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  660. Expire: "3600",
  661. ExpireInSecond: 3600,
  662. },
  663. tokens: map[string]interface{}{
  664. "token": DefaultToken,
  665. "refresh_token": RefreshToken,
  666. "client_id": "test",
  667. "expires": float64(36000),
  668. },
  669. },
  670. // oAuth authentication flow errors
  671. {
  672. name: "oAuth auth error",
  673. props: map[string]interface{}{
  674. "url": "http://localhost:52345/",
  675. "headers": map[string]interface{}{
  676. "Authorization": "Bearer {{.token}}",
  677. },
  678. "oAuth": map[string]interface{}{
  679. "access": map[string]interface{}{
  680. "url": "http://localhost:52345/token",
  681. "body": "{\"username\": \"admin\",\"password\": \"1234\"}",
  682. "expire": "3600",
  683. },
  684. },
  685. },
  686. config: &RawConf{
  687. Url: "http://localhost:52345/",
  688. ResendUrl: "http://localhost:52345/",
  689. Method: http.MethodGet,
  690. Interval: DefaultInterval,
  691. Timeout: DefaultTimeout,
  692. BodyType: "json",
  693. ResponseType: "code",
  694. InsecureSkipVerify: true,
  695. Headers: map[string]string{
  696. "Authorization": "Bearer {{.token}}",
  697. },
  698. OAuth: map[string]map[string]interface{}{
  699. "access": {
  700. "url": "http://localhost:52345/token",
  701. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  702. "expire": "3600",
  703. },
  704. },
  705. },
  706. accessConf: &AccessTokenConf{
  707. Url: "http://localhost:52345/token",
  708. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  709. Expire: "3600",
  710. ExpireInSecond: 3600,
  711. },
  712. tokens: map[string]interface{}{
  713. "token": DefaultToken,
  714. "refresh_token": RefreshToken,
  715. "client_id": "test",
  716. "expires": float64(36000),
  717. },
  718. err: errors.New("fail to authorize by oAuth: Cannot parse access token response to json: http return code error: 400"),
  719. },
  720. {
  721. name: "oAuth refresh error",
  722. props: map[string]interface{}{
  723. "url": "http://localhost:52345/",
  724. "headers": map[string]interface{}{
  725. "Authorization": "Bearer {{.token}}",
  726. },
  727. "oAuth": map[string]interface{}{
  728. "access": map[string]interface{}{
  729. "url": "http://localhost:52345/token",
  730. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  731. "expire": "3600",
  732. },
  733. "refresh": map[string]interface{}{
  734. "url": "http://localhost:52345/refresh",
  735. "headers": map[string]interface{}{
  736. "Authorization": "Bearer {{.token}}",
  737. "RefreshToken": "{{.token}}",
  738. },
  739. },
  740. },
  741. },
  742. err: errors.New("fail to authorize by oAuth: Cannot parse refresh token response to json: http return code error: 400"),
  743. },
  744. {
  745. name: "oAuth wrong access expire template",
  746. props: map[string]interface{}{
  747. "url": "http://localhost:52345/",
  748. "headers": map[string]interface{}{
  749. "Authorization": "Bearer {{.token}}",
  750. },
  751. "oAuth": map[string]interface{}{
  752. "access": map[string]interface{}{
  753. "url": "http://localhost:52345/token",
  754. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  755. "expire": "{{..expp}}",
  756. },
  757. },
  758. },
  759. err: errors.New("fail to authorize by oAuth: fail to parse the expire time for access token: template: sink:1: unexpected . after term \".\""),
  760. },
  761. {
  762. name: "oAuth wrong access expire type",
  763. props: map[string]interface{}{
  764. "url": "http://localhost:52345/",
  765. "headers": map[string]interface{}{
  766. "Authorization": "Bearer {{.token}}",
  767. },
  768. "oAuth": map[string]interface{}{
  769. "access": map[string]interface{}{
  770. "url": "http://localhost:52345/token",
  771. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  772. "expire": "{{.token}}",
  773. },
  774. },
  775. },
  776. err: errors.New("fail to authorize by oAuth: fail to covert the expire time privatisation for access token: cannot convert string(privatisation) to int"),
  777. },
  778. {
  779. name: "oAuth wrong access url",
  780. props: map[string]interface{}{
  781. "url": "http://localhost:52345/",
  782. "headers": map[string]interface{}{
  783. "Authorization": "Bearer {{.token}}",
  784. },
  785. "oAuth": map[string]interface{}{
  786. "access": map[string]interface{}{
  787. "url": "http:localhost:52345/token",
  788. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  789. "expire": "{{.token}}",
  790. },
  791. },
  792. },
  793. err: errors.New("fail to authorize by oAuth: fail to get access token: Post \"http:localhost:52345/token\": http: no Host in request URL"),
  794. },
  795. {
  796. name: "oAuth wrong refresh header template",
  797. props: map[string]interface{}{
  798. "url": "http://localhost:52345/",
  799. "headers": map[string]interface{}{
  800. "Authorization": "Bearer {{.token}}",
  801. },
  802. "oAuth": map[string]interface{}{
  803. "access": map[string]interface{}{
  804. "url": "http://localhost:52345/token",
  805. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  806. "expire": "3600",
  807. },
  808. "refresh": map[string]interface{}{
  809. "url": "http://localhost:52345/refresh",
  810. "headers": map[string]interface{}{
  811. "Authorization": "Bearer {{.token}}",
  812. "RefreshToken": "{{..token}}",
  813. },
  814. },
  815. },
  816. },
  817. err: errors.New("fail to authorize by oAuth: fail to parse the header for refresh token request RefreshToken: template: sink:1: unexpected . after term \".\""),
  818. },
  819. {
  820. name: "oAuth wrong refresh url",
  821. props: map[string]interface{}{
  822. "url": "http://localhost:52345/",
  823. "headers": map[string]interface{}{
  824. "Authorization": "Bearer {{.token}}",
  825. },
  826. "oAuth": map[string]interface{}{
  827. "access": map[string]interface{}{
  828. "url": "http://localhost:52345/token",
  829. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  830. "expire": "3600",
  831. },
  832. "refresh": map[string]interface{}{
  833. "url": "http:localhost:52345/refresh2",
  834. "headers": map[string]interface{}{
  835. "Authorization": "Bearer {{.token}}",
  836. "RefreshToken": "{{.token}}",
  837. },
  838. },
  839. },
  840. },
  841. err: errors.New("fail to authorize by oAuth: fail to get refresh token: Post \"http:localhost:52345/refresh2\": http: no Host in request URL"),
  842. },
  843. }
  844. server := mockAuthServer()
  845. server.Start()
  846. defer server.Close()
  847. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  848. for i, tt := range tests {
  849. t.Run(fmt.Sprintf("Test %d: %s", i, tt.name), func(t *testing.T) {
  850. r := &PullSource{}
  851. err := r.Configure("", tt.props)
  852. if err != nil {
  853. if tt.err == nil {
  854. t.Errorf("Expected error: %v", err)
  855. } else {
  856. if err.Error() != tt.err.Error() {
  857. t.Errorf("Error mismatch\nexp\t%v\ngot\t%v", tt.err, err)
  858. }
  859. }
  860. return
  861. }
  862. if !reflect.DeepEqual(r.config, tt.config) {
  863. t.Errorf("Config mismatch\nexp\t%+v\ngot\t%+v", tt.config, r.config)
  864. }
  865. if !reflect.DeepEqual(r.accessConf, tt.accessConf) {
  866. t.Errorf("AccessConf mismatch\nexp\t%+v\ngot\t%+v", tt.accessConf, r.accessConf)
  867. }
  868. if !reflect.DeepEqual(r.refreshConf, tt.refreshConf) {
  869. t.Errorf("RefreshConf mismatch\nexp\t%+v\ngot\t%+v", tt.refreshConf, r.refreshConf)
  870. }
  871. if !reflect.DeepEqual(r.tokens, tt.tokens) {
  872. t.Errorf("Tokens mismatch\nexp\t%s\ngot\t%s", tt.tokens, r.tokens)
  873. }
  874. })
  875. }
  876. }
  877. func TestPullWithAuth(t *testing.T) {
  878. conf.IsTesting = false
  879. conf.InitClock()
  880. r := &PullSource{}
  881. server := mockAuthServer()
  882. server.Start()
  883. defer server.Close()
  884. err := r.Configure("data", map[string]interface{}{
  885. "url": "http://localhost:52345/",
  886. "interval": 100,
  887. "headers": map[string]interface{}{
  888. "Authorization": "Bearer {{.token}}",
  889. },
  890. "oAuth": map[string]interface{}{
  891. "access": map[string]interface{}{
  892. "url": "http://localhost:52345/token",
  893. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  894. "expire": "10",
  895. },
  896. "refresh": map[string]interface{}{
  897. "url": "http://localhost:52345/refresh",
  898. "headers": map[string]interface{}{
  899. "Authorization": "Bearer {{.token}}",
  900. "RefreshToken": "{{.refresh_token}}",
  901. },
  902. },
  903. },
  904. })
  905. if err != nil {
  906. t.Errorf(err.Error())
  907. return
  908. }
  909. mc := conf.Clock
  910. exp := []api.SourceTuple{
  911. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}, map[string]interface{}{}, mc.Now()),
  912. }
  913. mock.TestSourceOpen(r, exp, t)
  914. }
  915. func TestPullIncremental(t *testing.T) {
  916. conf.IsTesting = false
  917. conf.InitClock()
  918. r := &PullSource{}
  919. server := mockAuthServer()
  920. server.Start()
  921. defer server.Close()
  922. err := r.Configure("data2", map[string]interface{}{
  923. "url": "http://localhost:52345/",
  924. "interval": 100,
  925. "incremental": true,
  926. "responseType": "body",
  927. })
  928. if err != nil {
  929. t.Errorf(err.Error())
  930. return
  931. }
  932. mc := conf.Clock
  933. exp := []api.SourceTuple{
  934. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device0", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
  935. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
  936. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device2", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
  937. }
  938. mock.TestSourceOpen(r, exp, t)
  939. }
  940. func TestPullJsonList(t *testing.T) {
  941. conf.IsTesting = false
  942. conf.InitClock()
  943. r := &PullSource{}
  944. server := mockAuthServer()
  945. server.Start()
  946. defer server.Close()
  947. err := r.Configure("data3", map[string]interface{}{
  948. "url": "http://localhost:52345/",
  949. "interval": 100,
  950. "responseType": "body",
  951. })
  952. if err != nil {
  953. t.Errorf(err.Error())
  954. return
  955. }
  956. mc := conf.Clock
  957. exp := []api.SourceTuple{
  958. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
  959. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d2", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
  960. }
  961. mock.TestSourceOpen(r, exp, t)
  962. }
  963. func TestPullUrlTimeRange(t *testing.T) {
  964. r := &PullSource{}
  965. server := mockAuthServer()
  966. server.Start()
  967. defer server.Close()
  968. err := r.Configure("", map[string]interface{}{
  969. "url": "http://localhost:52345/data4?device=d1&start={{.LastPullTime}}&end={{.PullTime}}",
  970. "interval": 110,
  971. "responseType": "body",
  972. })
  973. if err != nil {
  974. t.Errorf(err.Error())
  975. return
  976. }
  977. // Mock time
  978. mockclock.ResetClock(143)
  979. exp := []api.SourceTuple{
  980. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": float64(43), "temperature": float64(33)}}, map[string]interface{}{}, time.UnixMilli(143)),
  981. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": float64(53), "temperature": float64(43)}}, map[string]interface{}{}, time.UnixMilli(253)),
  982. }
  983. c := mockclock.GetMockClock()
  984. go func() {
  985. time.Sleep(10 * time.Millisecond)
  986. c.Add(350 * time.Millisecond)
  987. }()
  988. mock.TestSourceOpen(r, exp, t)
  989. }
  990. func TestPullBodyTimeRange(t *testing.T) {
  991. r := &PullSource{}
  992. server := mockAuthServer()
  993. server.Start()
  994. defer server.Close()
  995. err := r.Configure("data5", map[string]interface{}{
  996. "url": "http://localhost:52345/",
  997. "interval": 110,
  998. "responseType": "body",
  999. "method": "POST",
  1000. "body": `{"device": "d1", "start": {{.LastPullTime}}, "end": {{.PullTime}}}`,
  1001. })
  1002. if err != nil {
  1003. t.Errorf(err.Error())
  1004. return
  1005. }
  1006. // Mock time
  1007. mockclock.ResetClock(143)
  1008. exp := []api.SourceTuple{
  1009. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": float64(43), "temperature": float64(33)}}, map[string]interface{}{}, time.UnixMilli(143)),
  1010. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": float64(53), "temperature": float64(43)}}, map[string]interface{}{}, time.UnixMilli(253)),
  1011. }
  1012. c := mockclock.GetMockClock()
  1013. go func() {
  1014. time.Sleep(10 * time.Millisecond)
  1015. c.Add(350 * time.Millisecond)
  1016. }()
  1017. mock.TestSourceOpen(r, exp, t)
  1018. }
  1019. func TestPullErrorTest(t *testing.T) {
  1020. conf.IsTesting = false
  1021. conf.InitClock()
  1022. tests := []struct {
  1023. name string
  1024. conf map[string]interface{}
  1025. exp []api.SourceTuple
  1026. }{
  1027. {
  1028. name: "wrong url template",
  1029. conf: map[string]interface{}{"url": "http://localhost:52345/data4?device=d1&start={{.lastPullTime}}&end={{.PullTime}", "interval": 10},
  1030. exp: []api.SourceTuple{
  1031. &xsql.ErrorSourceTuple{
  1032. Error: errors.New("parse url http://localhost:52345/data4?device=d1&start={{.lastPullTime}}&end={{.PullTime} error template: sink:1: bad character U+007D '}'"),
  1033. },
  1034. },
  1035. }, {
  1036. name: "wrong header template",
  1037. conf: map[string]interface{}{"url": "http://localhost:52345/data4", "interval": 10, "HeadersTemplate": "\"Authorization\": \"Bearer {{.aatoken}}"},
  1038. exp: []api.SourceTuple{
  1039. &xsql.ErrorSourceTuple{
  1040. Error: errors.New("parse headers error parsed header template is not json: \"Authorization\": \"Bearer <no value>"),
  1041. },
  1042. },
  1043. }, {
  1044. name: "wrong body template",
  1045. conf: map[string]interface{}{"url": "http://localhost:52345/data4", "interval": 10, "body": `{"device": "d1", "start": {{.LastPullTime}}, "end": {{.pullTime}}}`},
  1046. exp: []api.SourceTuple{
  1047. &xsql.ErrorSourceTuple{
  1048. Error: errors.New("parse body {\"device\": \"d1\", \"start\": {{.LastPullTime}}, \"end\": {{.pullTime}}} error template: sink:1:54: executing \"sink\" at <.pullTime>: can't evaluate field pullTime in type *http.pullTimeMeta"),
  1049. },
  1050. },
  1051. }, {
  1052. name: "wrong response",
  1053. conf: map[string]interface{}{"url": "http://localhost:52345/aa/data4", "interval": 10},
  1054. exp: []api.SourceTuple{
  1055. &xsql.ErrorSourceTuple{
  1056. Error: errors.New("parse response error http return code error: 404"),
  1057. },
  1058. },
  1059. }, {
  1060. name: "wrong request",
  1061. conf: map[string]interface{}{"url": "http://localhost:52345/aa/data4", "interval": 10, "bodyType": "form", "body": "ddd"},
  1062. exp: []api.SourceTuple{
  1063. &xsql.ErrorSourceTuple{
  1064. Error: errors.New("send request error invalid content: ddd"),
  1065. },
  1066. },
  1067. },
  1068. }
  1069. server := mockAuthServer()
  1070. server.Start()
  1071. defer server.Close()
  1072. for _, test := range tests {
  1073. t.Run(test.name, func(t *testing.T) {
  1074. r := &PullSource{}
  1075. err := r.Configure("", test.conf)
  1076. assert.NoError(t, err)
  1077. mock.TestSourceOpen(r, test.exp, t)
  1078. })
  1079. }
  1080. }