httppull_source_test.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package http
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "github.com/gorilla/mux"
  20. "github.com/lf-edge/ekuiper/internal/io/mock"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "net"
  23. "net/http"
  24. "net/http/httptest"
  25. "path/filepath"
  26. "reflect"
  27. "strconv"
  28. "testing"
  29. )
  30. func jsonOut(w http.ResponseWriter, err error, out interface{}) {
  31. w.Header().Add("Content-Type", "application/json")
  32. enc := json.NewEncoder(w)
  33. err = enc.Encode(out)
  34. // Problems encoding
  35. if err != nil {
  36. http.Error(w, err.Error(), http.StatusBadRequest)
  37. }
  38. }
  39. const (
  40. DefaultToken = "privatisation"
  41. RefreshToken = "privaterefresh"
  42. )
  43. // mock http auth server
  44. func mockAuthServer() *httptest.Server {
  45. l, _ := net.Listen("tcp", "127.0.0.1:52345")
  46. router := mux.NewRouter()
  47. i := 0
  48. router.HandleFunc("/token", func(w http.ResponseWriter, r *http.Request) {
  49. body := &struct {
  50. Username string `json:"username"`
  51. Password string `json:"password"`
  52. }{}
  53. err := json.NewDecoder(r.Body).Decode(body)
  54. if err != nil {
  55. http.Error(w, err.Error(), http.StatusBadRequest)
  56. }
  57. if body.Username != "admin" || body.Password != "0000" {
  58. http.Error(w, "invalid username or password", http.StatusBadRequest)
  59. }
  60. out := &struct {
  61. Token string `json:"token"`
  62. RefreshToken string `json:"refresh_token"`
  63. ClientId string `json:"client_id"`
  64. Expires int64 `json:"expires"`
  65. }{
  66. Token: DefaultToken,
  67. RefreshToken: RefreshToken,
  68. ClientId: "test",
  69. Expires: 36000,
  70. }
  71. jsonOut(w, err, out)
  72. }).Methods(http.MethodPost)
  73. router.HandleFunc("/refresh", func(w http.ResponseWriter, r *http.Request) {
  74. token := r.Header.Get("Authorization")
  75. if token != "Bearer "+DefaultToken {
  76. http.Error(w, "invalid token", http.StatusBadRequest)
  77. }
  78. rt := r.Header.Get("RefreshToken")
  79. if rt != RefreshToken {
  80. http.Error(w, "invalid refresh token", http.StatusBadRequest)
  81. }
  82. out := &struct {
  83. Token string `json:"token"`
  84. RefreshToken string `json:"refresh_token"`
  85. ClientId string `json:"client_id"`
  86. Expires int64 `json:"expires"`
  87. }{
  88. Token: DefaultToken,
  89. RefreshToken: RefreshToken,
  90. ClientId: "test",
  91. Expires: 36000,
  92. }
  93. jsonOut(w, nil, out)
  94. }).Methods(http.MethodPost)
  95. router.HandleFunc("/data", func(w http.ResponseWriter, r *http.Request) {
  96. token := r.Header.Get("Authorization")
  97. if token != "Bearer "+DefaultToken {
  98. http.Error(w, "invalid token", http.StatusBadRequest)
  99. }
  100. out := &struct {
  101. DeviceId string `json:"device_id"`
  102. Temperature float64 `json:"temperature"`
  103. Humidity float64 `json:"humidity"`
  104. }{
  105. DeviceId: "device1",
  106. Temperature: 25.5,
  107. Humidity: 60.0,
  108. }
  109. jsonOut(w, nil, out)
  110. }).Methods(http.MethodGet)
  111. // Return same data for 3 times
  112. router.HandleFunc("/data2", func(w http.ResponseWriter, r *http.Request) {
  113. out := &struct {
  114. Code int `json:"code"`
  115. Data struct {
  116. DeviceId string `json:"device_id"`
  117. Temperature float64 `json:"temperature"`
  118. Humidity float64 `json:"humidity"`
  119. } `json:"data"`
  120. }{
  121. Code: 200,
  122. Data: struct {
  123. DeviceId string `json:"device_id"`
  124. Temperature float64 `json:"temperature"`
  125. Humidity float64 `json:"humidity"`
  126. }{
  127. DeviceId: "device" + strconv.Itoa(i/3),
  128. Temperature: 25.5,
  129. Humidity: 60.0,
  130. },
  131. }
  132. i++
  133. jsonOut(w, nil, out)
  134. }).Methods(http.MethodGet)
  135. server := httptest.NewUnstartedServer(router)
  136. server.Listener.Close()
  137. server.Listener = l
  138. return server
  139. }
  140. var wrongPath, _ = filepath.Abs("/tmp/wrong")
  141. // Test configure to properties
  142. func TestConfigure(t *testing.T) {
  143. tests := []struct {
  144. name string
  145. props map[string]interface{}
  146. err error
  147. config *HTTPPullConf
  148. accessConf *AccessTokenConf
  149. refreshConf *RefreshTokenConf
  150. tokens map[string]interface{}
  151. }{
  152. {
  153. name: "default",
  154. props: map[string]interface{}{
  155. "incremental": true,
  156. "url": "http://localhost:9090/",
  157. },
  158. config: &HTTPPullConf{
  159. Incremental: true,
  160. Url: "http://localhost:9090/",
  161. Method: http.MethodGet,
  162. Interval: DefaultInterval,
  163. Timeout: DefaultTimeout,
  164. BodyType: "json",
  165. ResponseType: "code",
  166. InsecureSkipVerify: true,
  167. Headers: map[string]string{},
  168. },
  169. },
  170. // Test wrong properties
  171. {
  172. name: "wrong props",
  173. props: map[string]interface{}{
  174. "incremental": true,
  175. "url": 123,
  176. },
  177. err: fmt.Errorf("fail to parse the properties: 1 error(s) decoding:\n\n* 'url' expected type 'string', got unconvertible type 'int', value: '123'"),
  178. },
  179. {
  180. name: "empty url",
  181. props: map[string]interface{}{
  182. "incremental": true,
  183. "url": "",
  184. },
  185. err: fmt.Errorf("url is required"),
  186. },
  187. {
  188. name: "wrong method",
  189. props: map[string]interface{}{
  190. "url": "http://localhost:9090/",
  191. "method": "wrong",
  192. },
  193. err: fmt.Errorf("Not supported HTTP method wrong."),
  194. },
  195. {
  196. name: "wrong bodytype",
  197. props: map[string]interface{}{
  198. "url": "http://localhost:9090/",
  199. "bodyType": "wrong",
  200. },
  201. err: fmt.Errorf("Not valid body type value wrong."),
  202. },
  203. {
  204. name: "wrong response type",
  205. props: map[string]interface{}{
  206. "url": "http://localhost:9090/",
  207. "responseType": "wrong",
  208. },
  209. err: fmt.Errorf("Not valid response type value wrong."),
  210. },
  211. {
  212. name: "wrong url",
  213. props: map[string]interface{}{
  214. "url": "http:/localhost:9090/",
  215. },
  216. err: fmt.Errorf("Invalid url, host not found"),
  217. },
  218. {
  219. name: "wrong interval",
  220. props: map[string]interface{}{
  221. "url": "http:/localhost:9090/",
  222. "interval": -2,
  223. },
  224. err: fmt.Errorf("interval must be greater than 0"),
  225. },
  226. {
  227. name: "wrong timeout",
  228. props: map[string]interface{}{
  229. "url": "http:/localhost:9090/",
  230. "timeout": -2,
  231. },
  232. err: fmt.Errorf("timeout must be greater than or equal to 0"),
  233. },
  234. {
  235. name: "wrong tls",
  236. props: map[string]interface{}{
  237. "url": "http://localhost:9090/",
  238. "certificationPath": wrongPath,
  239. },
  240. err: fmt.Errorf(fmt.Sprintf("stat %s: no such file or directory", wrongPath)),
  241. },
  242. // Test oAuth
  243. {
  244. name: "oAuth with access token and constant expire",
  245. props: map[string]interface{}{
  246. "url": "http://localhost:52345/",
  247. "headers": map[string]interface{}{
  248. "Authorization": "Bearer {{.token}}",
  249. },
  250. "oAuth": map[string]interface{}{
  251. "access": map[string]interface{}{
  252. "url": "http://localhost:52345/token",
  253. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  254. "expire": "3600",
  255. },
  256. },
  257. },
  258. config: &HTTPPullConf{
  259. Url: "http://localhost:52345/",
  260. Method: http.MethodGet,
  261. Interval: DefaultInterval,
  262. Timeout: DefaultTimeout,
  263. BodyType: "json",
  264. ResponseType: "code",
  265. InsecureSkipVerify: true,
  266. Headers: map[string]string{
  267. "Authorization": "Bearer {{.token}}",
  268. },
  269. OAuth: map[string]map[string]interface{}{
  270. "access": {
  271. "url": "http://localhost:52345/token",
  272. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  273. "expire": "3600",
  274. },
  275. },
  276. },
  277. accessConf: &AccessTokenConf{
  278. Url: "http://localhost:52345/token",
  279. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  280. Expire: "3600",
  281. ExpireInSecond: 3600,
  282. },
  283. tokens: map[string]interface{}{
  284. "token": DefaultToken,
  285. "refresh_token": RefreshToken,
  286. "client_id": "test",
  287. "expires": float64(36000),
  288. },
  289. },
  290. {
  291. name: "oAuth with access token and dynamic expire",
  292. props: map[string]interface{}{
  293. "url": "http://localhost:52345/",
  294. "headers": map[string]interface{}{
  295. "Authorization": "Bearer {{.token}}",
  296. },
  297. "oAuth": map[string]interface{}{
  298. "access": map[string]interface{}{
  299. "url": "http://localhost:52345/token",
  300. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  301. "expire": "{{.expires}}",
  302. },
  303. },
  304. },
  305. config: &HTTPPullConf{
  306. Url: "http://localhost:52345/",
  307. Method: http.MethodGet,
  308. Interval: DefaultInterval,
  309. Timeout: DefaultTimeout,
  310. BodyType: "json",
  311. ResponseType: "code",
  312. InsecureSkipVerify: true,
  313. Headers: map[string]string{
  314. "Authorization": "Bearer {{.token}}",
  315. },
  316. OAuth: map[string]map[string]interface{}{
  317. "access": {
  318. "url": "http://localhost:52345/token",
  319. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  320. "expire": "{{.expires}}",
  321. },
  322. },
  323. },
  324. accessConf: &AccessTokenConf{
  325. Url: "http://localhost:52345/token",
  326. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  327. Expire: "{{.expires}}",
  328. ExpireInSecond: 36000,
  329. },
  330. tokens: map[string]interface{}{
  331. "token": DefaultToken,
  332. "refresh_token": RefreshToken,
  333. "client_id": "test",
  334. "expires": float64(36000),
  335. },
  336. },
  337. {
  338. name: "oAuth with access token and refresh token",
  339. props: map[string]interface{}{
  340. "url": "http://localhost:52345/",
  341. "headers": map[string]interface{}{
  342. "Authorization": "Bearer {{.token}}",
  343. },
  344. "oAuth": map[string]interface{}{
  345. "access": map[string]interface{}{
  346. "url": "http://localhost:52345/token",
  347. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  348. "expire": "3600",
  349. },
  350. "refresh": map[string]interface{}{
  351. "url": "http://localhost:52345/refresh",
  352. "headers": map[string]interface{}{
  353. "Authorization": "Bearer {{.token}}",
  354. "RefreshToken": "{{.refresh_token}}",
  355. },
  356. },
  357. },
  358. },
  359. config: &HTTPPullConf{
  360. Url: "http://localhost:52345/",
  361. Method: http.MethodGet,
  362. Interval: DefaultInterval,
  363. Timeout: DefaultTimeout,
  364. BodyType: "json",
  365. ResponseType: "code",
  366. InsecureSkipVerify: true,
  367. Headers: map[string]string{
  368. "Authorization": "Bearer {{.token}}",
  369. },
  370. OAuth: map[string]map[string]interface{}{
  371. "access": {
  372. "url": "http://localhost:52345/token",
  373. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  374. "expire": "3600",
  375. },
  376. "refresh": {
  377. "url": "http://localhost:52345/refresh",
  378. "headers": map[string]interface{}{
  379. "Authorization": "Bearer {{.token}}",
  380. "RefreshToken": "{{.refresh_token}}",
  381. },
  382. },
  383. },
  384. },
  385. accessConf: &AccessTokenConf{
  386. Url: "http://localhost:52345/token",
  387. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  388. Expire: "3600",
  389. ExpireInSecond: 3600,
  390. },
  391. refreshConf: &RefreshTokenConf{
  392. Url: "http://localhost:52345/refresh",
  393. Headers: map[string]string{
  394. "Authorization": "Bearer {{.token}}",
  395. "RefreshToken": "{{.refresh_token}}",
  396. },
  397. },
  398. tokens: map[string]interface{}{
  399. "token": DefaultToken,
  400. "refresh_token": RefreshToken,
  401. "client_id": "test",
  402. "expires": float64(36000),
  403. },
  404. },
  405. // Wrong auth configs
  406. {
  407. name: "oAuth wrong access token config",
  408. props: map[string]interface{}{
  409. "url": "http://localhost:52345/",
  410. "headers": map[string]interface{}{
  411. "Authorization": "Bearer {{.token}}",
  412. },
  413. "oAuth": map[string]interface{}{
  414. "access": map[string]interface{}{
  415. "url": "http://localhost:52345/token",
  416. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  417. "expire": 3600,
  418. },
  419. },
  420. },
  421. err: errors.New("fail to parse the access properties of oAuth: 1 error(s) decoding:\n\n* 'expire' expected type 'string', got unconvertible type 'int', value: '3600'"),
  422. },
  423. {
  424. name: "oAuth wrong access url",
  425. props: map[string]interface{}{
  426. "url": "http://localhost:52345/",
  427. "headers": map[string]interface{}{
  428. "Authorization": "Bearer {{.token}}",
  429. },
  430. "oAuth": map[string]interface{}{
  431. "access": map[string]interface{}{
  432. "url": "",
  433. },
  434. },
  435. },
  436. err: errors.New("access token url is required"),
  437. },
  438. {
  439. name: "oAuth miss access",
  440. props: map[string]interface{}{
  441. "url": "http://localhost:52345/",
  442. "headers": map[string]interface{}{
  443. "Authorization": "Bearer {{.token}}",
  444. },
  445. "oAuth": map[string]interface{}{
  446. "refresh": map[string]interface{}{
  447. "url": "http://localhost:52345/",
  448. },
  449. },
  450. },
  451. err: errors.New("if setting oAuth, `access` property is required"),
  452. },
  453. {
  454. name: "oAuth wrong refresh token config",
  455. props: map[string]interface{}{
  456. "url": "http://localhost:52345/",
  457. "headers": map[string]interface{}{
  458. "Authorization": "Bearer {{.token}}",
  459. },
  460. "oAuth": map[string]interface{}{
  461. "access": map[string]interface{}{
  462. "url": "http://localhost:52345/token",
  463. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  464. "expire": "3600",
  465. },
  466. "refresh": map[string]interface{}{
  467. "url": 1202,
  468. },
  469. },
  470. },
  471. accessConf: &AccessTokenConf{
  472. Url: "http://localhost:52345/token",
  473. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  474. Expire: "3600",
  475. ExpireInSecond: 3600,
  476. },
  477. err: errors.New("fail to parse the refresh token properties: 1 error(s) decoding:\n\n* 'url' expected type 'string', got unconvertible type 'int', value: '1202'"),
  478. },
  479. {
  480. name: "oAuth refresh token missing url",
  481. props: map[string]interface{}{
  482. "url": "http://localhost:52345/",
  483. "headers": map[string]interface{}{
  484. "Authorization": "Bearer {{.token}}",
  485. },
  486. "oAuth": map[string]interface{}{
  487. "access": map[string]interface{}{
  488. "url": "http://localhost:52345/token",
  489. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  490. "expire": "3600",
  491. },
  492. "refresh": map[string]interface{}{
  493. "url": "",
  494. },
  495. },
  496. },
  497. accessConf: &AccessTokenConf{
  498. Url: "http://localhost:52345/token",
  499. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  500. Expire: "3600",
  501. ExpireInSecond: 3600,
  502. },
  503. err: errors.New("refresh token url is required"),
  504. },
  505. // oAuth authentication flow errors
  506. {
  507. name: "oAuth auth error",
  508. props: map[string]interface{}{
  509. "url": "http://localhost:52345/",
  510. "headers": map[string]interface{}{
  511. "Authorization": "Bearer {{.token}}",
  512. },
  513. "oAuth": map[string]interface{}{
  514. "access": map[string]interface{}{
  515. "url": "http://localhost:52345/token",
  516. "body": "{\"username\": \"admin\",\"password\": \"1234\"}",
  517. "expire": "3600",
  518. },
  519. },
  520. },
  521. config: &HTTPPullConf{
  522. Url: "http://localhost:52345/",
  523. Method: http.MethodGet,
  524. Interval: DefaultInterval,
  525. Timeout: DefaultTimeout,
  526. BodyType: "json",
  527. ResponseType: "code",
  528. InsecureSkipVerify: true,
  529. Headers: map[string]string{
  530. "Authorization": "Bearer {{.token}}",
  531. },
  532. OAuth: map[string]map[string]interface{}{
  533. "access": {
  534. "url": "http://localhost:52345/token",
  535. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  536. "expire": "3600",
  537. },
  538. },
  539. },
  540. accessConf: &AccessTokenConf{
  541. Url: "http://localhost:52345/token",
  542. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  543. Expire: "3600",
  544. ExpireInSecond: 3600,
  545. },
  546. tokens: map[string]interface{}{
  547. "token": DefaultToken,
  548. "refresh_token": RefreshToken,
  549. "client_id": "test",
  550. "expires": float64(36000),
  551. },
  552. err: errors.New("fail to authorize by oAuth: Cannot parse access token response to json: http return code error: 400"),
  553. },
  554. {
  555. name: "oAuth refresh error",
  556. props: map[string]interface{}{
  557. "url": "http://localhost:52345/",
  558. "headers": map[string]interface{}{
  559. "Authorization": "Bearer {{.token}}",
  560. },
  561. "oAuth": map[string]interface{}{
  562. "access": map[string]interface{}{
  563. "url": "http://localhost:52345/token",
  564. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  565. "expire": "3600",
  566. },
  567. "refresh": map[string]interface{}{
  568. "url": "http://localhost:52345/refresh",
  569. "headers": map[string]interface{}{
  570. "Authorization": "Bearer {{.token}}",
  571. "RefreshToken": "{{.token}}",
  572. },
  573. },
  574. },
  575. },
  576. err: errors.New("fail to authorize by oAuth: Cannot parse refresh token response to json: http return code error: 400"),
  577. },
  578. {
  579. name: "oAuth wrong access expire template",
  580. props: map[string]interface{}{
  581. "url": "http://localhost:52345/",
  582. "headers": map[string]interface{}{
  583. "Authorization": "Bearer {{.token}}",
  584. },
  585. "oAuth": map[string]interface{}{
  586. "access": map[string]interface{}{
  587. "url": "http://localhost:52345/token",
  588. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  589. "expire": "{{..expp}}",
  590. },
  591. },
  592. },
  593. err: errors.New("fail to authorize by oAuth: fail to parse the expire time for access token: template: sink:1: unexpected . after term \".\""),
  594. },
  595. {
  596. name: "oAuth wrong access expire type",
  597. props: map[string]interface{}{
  598. "url": "http://localhost:52345/",
  599. "headers": map[string]interface{}{
  600. "Authorization": "Bearer {{.token}}",
  601. },
  602. "oAuth": map[string]interface{}{
  603. "access": map[string]interface{}{
  604. "url": "http://localhost:52345/token",
  605. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  606. "expire": "{{.token}}",
  607. },
  608. },
  609. },
  610. err: errors.New("fail to authorize by oAuth: fail to covert the expire time privatisation for access token: cannot convert string(privatisation) to int"),
  611. },
  612. {
  613. name: "oAuth wrong access url",
  614. props: map[string]interface{}{
  615. "url": "http://localhost:52345/",
  616. "headers": map[string]interface{}{
  617. "Authorization": "Bearer {{.token}}",
  618. },
  619. "oAuth": map[string]interface{}{
  620. "access": map[string]interface{}{
  621. "url": "http:localhost:52345/token",
  622. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  623. "expire": "{{.token}}",
  624. },
  625. },
  626. },
  627. err: errors.New("fail to authorize by oAuth: fail to get access token: Post \"http:localhost:52345/token\": http: no Host in request URL"),
  628. },
  629. {
  630. name: "oAuth wrong refresh header template",
  631. props: map[string]interface{}{
  632. "url": "http://localhost:52345/",
  633. "headers": map[string]interface{}{
  634. "Authorization": "Bearer {{.token}}",
  635. },
  636. "oAuth": map[string]interface{}{
  637. "access": map[string]interface{}{
  638. "url": "http://localhost:52345/token",
  639. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  640. "expire": "3600",
  641. },
  642. "refresh": map[string]interface{}{
  643. "url": "http://localhost:52345/refresh",
  644. "headers": map[string]interface{}{
  645. "Authorization": "Bearer {{.token}}",
  646. "RefreshToken": "{{..token}}",
  647. },
  648. },
  649. },
  650. },
  651. err: errors.New("fail to authorize by oAuth: fail to parse the header for refresh token request RefreshToken: template: sink:1: unexpected . after term \".\""),
  652. },
  653. {
  654. name: "oAuth wrong refresh url",
  655. props: map[string]interface{}{
  656. "url": "http://localhost:52345/",
  657. "headers": map[string]interface{}{
  658. "Authorization": "Bearer {{.token}}",
  659. },
  660. "oAuth": map[string]interface{}{
  661. "access": map[string]interface{}{
  662. "url": "http://localhost:52345/token",
  663. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  664. "expire": "3600",
  665. },
  666. "refresh": map[string]interface{}{
  667. "url": "http:localhost:52345/refresh2",
  668. "headers": map[string]interface{}{
  669. "Authorization": "Bearer {{.token}}",
  670. "RefreshToken": "{{.token}}",
  671. },
  672. },
  673. },
  674. },
  675. err: errors.New("fail to authorize by oAuth: fail to get refresh token: Post \"http:localhost:52345/refresh2\": http: no Host in request URL"),
  676. },
  677. }
  678. server := mockAuthServer()
  679. server.Start()
  680. defer server.Close()
  681. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  682. for i, tt := range tests {
  683. t.Run(fmt.Sprintf("Test %d: %s", i, tt.name), func(t *testing.T) {
  684. r := &HTTPPullSource{}
  685. err := r.Configure("", tt.props)
  686. if err != nil {
  687. if tt.err == nil {
  688. t.Errorf("Expected error: %v", err)
  689. } else {
  690. if err.Error() != tt.err.Error() {
  691. t.Errorf("Error mismatch\nexp\t%v\ngot\t%v", tt.err, err)
  692. }
  693. }
  694. return
  695. }
  696. if !reflect.DeepEqual(r.config, tt.config) {
  697. t.Errorf("Config mismatch\nexp\t%+v\ngot\t%+v", tt.config, r.config)
  698. }
  699. if !reflect.DeepEqual(r.accessConf, tt.accessConf) {
  700. t.Errorf("AccessConf mismatch\nexp\t%+v\ngot\t%+v", tt.accessConf, r.accessConf)
  701. }
  702. if !reflect.DeepEqual(r.refreshConf, tt.refreshConf) {
  703. t.Errorf("RefreshConf mismatch\nexp\t%+v\ngot\t%+v", tt.refreshConf, r.refreshConf)
  704. }
  705. if !reflect.DeepEqual(r.tokens, tt.tokens) {
  706. t.Errorf("Tokens mismatch\nexp\t%s\ngot\t%s", tt.tokens, r.tokens)
  707. }
  708. })
  709. }
  710. }
  711. func TestPullWithAuth(t *testing.T) {
  712. r := &HTTPPullSource{}
  713. server := mockAuthServer()
  714. server.Start()
  715. defer server.Close()
  716. err := r.Configure("data", map[string]interface{}{
  717. "url": "http://localhost:52345/",
  718. "interval": 100,
  719. "headers": map[string]interface{}{
  720. "Authorization": "Bearer {{.token}}",
  721. },
  722. "oAuth": map[string]interface{}{
  723. "access": map[string]interface{}{
  724. "url": "http://localhost:52345/token",
  725. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  726. "expire": "10",
  727. },
  728. "refresh": map[string]interface{}{
  729. "url": "http://localhost:52345/refresh",
  730. "headers": map[string]interface{}{
  731. "Authorization": "Bearer {{.token}}",
  732. "RefreshToken": "{{.refresh_token}}",
  733. },
  734. },
  735. },
  736. })
  737. if err != nil {
  738. t.Errorf(err.Error())
  739. return
  740. }
  741. exp := []api.SourceTuple{
  742. api.NewDefaultSourceTuple(map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}, map[string]interface{}{}),
  743. }
  744. mock.TestSourceOpen(r, exp, t)
  745. }
  746. func TestPullIncremental(t *testing.T) {
  747. r := &HTTPPullSource{}
  748. server := mockAuthServer()
  749. server.Start()
  750. defer server.Close()
  751. err := r.Configure("data2", map[string]interface{}{
  752. "url": "http://localhost:52345/",
  753. "interval": 100,
  754. "incremental": true,
  755. "responseType": "body",
  756. })
  757. if err != nil {
  758. t.Errorf(err.Error())
  759. return
  760. }
  761. exp := []api.SourceTuple{
  762. api.NewDefaultSourceTuple(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device0", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}),
  763. api.NewDefaultSourceTuple(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}),
  764. api.NewDefaultSourceTuple(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device2", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}),
  765. }
  766. mock.TestSourceOpen(r, exp, t)
  767. }