httppull_source_test.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package http
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "github.com/gorilla/mux"
  20. "github.com/lf-edge/ekuiper/internal/io/mock"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "net"
  23. "net/http"
  24. "net/http/httptest"
  25. "path/filepath"
  26. "reflect"
  27. "strconv"
  28. "testing"
  29. )
  30. func jsonOut(w http.ResponseWriter, err error, out interface{}) {
  31. w.Header().Add("Content-Type", "application/json")
  32. enc := json.NewEncoder(w)
  33. err = enc.Encode(out)
  34. // Problems encoding
  35. if err != nil {
  36. http.Error(w, err.Error(), http.StatusBadRequest)
  37. }
  38. }
  39. const (
  40. DefaultToken = "privatisation"
  41. RefreshToken = "privaterefresh"
  42. )
  43. // mock http auth server
  44. func mockAuthServer() *httptest.Server {
  45. l, _ := net.Listen("tcp", "127.0.0.1:52345")
  46. router := mux.NewRouter()
  47. i := 0
  48. router.HandleFunc("/token", func(w http.ResponseWriter, r *http.Request) {
  49. body := &struct {
  50. Username string `json:"username"`
  51. Password string `json:"password"`
  52. }{}
  53. err := json.NewDecoder(r.Body).Decode(body)
  54. if err != nil {
  55. http.Error(w, err.Error(), http.StatusBadRequest)
  56. }
  57. if body.Username != "admin" || body.Password != "0000" {
  58. http.Error(w, "invalid username or password", http.StatusBadRequest)
  59. }
  60. out := &struct {
  61. Token string `json:"token"`
  62. RefreshToken string `json:"refresh_token"`
  63. ClientId string `json:"client_id"`
  64. Expires int64 `json:"expires"`
  65. }{
  66. Token: DefaultToken,
  67. RefreshToken: RefreshToken,
  68. ClientId: "test",
  69. Expires: 36000,
  70. }
  71. jsonOut(w, err, out)
  72. }).Methods(http.MethodPost)
  73. router.HandleFunc("/refresh", func(w http.ResponseWriter, r *http.Request) {
  74. token := r.Header.Get("Authorization")
  75. if token != "Bearer "+DefaultToken {
  76. http.Error(w, "invalid token", http.StatusBadRequest)
  77. }
  78. rt := r.Header.Get("RefreshToken")
  79. if rt != RefreshToken {
  80. http.Error(w, "invalid refresh token", http.StatusBadRequest)
  81. }
  82. out := &struct {
  83. Token string `json:"token"`
  84. RefreshToken string `json:"refresh_token"`
  85. ClientId string `json:"client_id"`
  86. Expires int64 `json:"expires"`
  87. }{
  88. Token: DefaultToken,
  89. RefreshToken: RefreshToken,
  90. ClientId: "test",
  91. Expires: 36000,
  92. }
  93. jsonOut(w, nil, out)
  94. }).Methods(http.MethodPost)
  95. router.HandleFunc("/data", func(w http.ResponseWriter, r *http.Request) {
  96. token := r.Header.Get("Authorization")
  97. if token != "Bearer "+DefaultToken {
  98. http.Error(w, "invalid token", http.StatusBadRequest)
  99. }
  100. out := &struct {
  101. DeviceId string `json:"device_id"`
  102. Temperature float64 `json:"temperature"`
  103. Humidity float64 `json:"humidity"`
  104. }{
  105. DeviceId: "device1",
  106. Temperature: 25.5,
  107. Humidity: 60.0,
  108. }
  109. jsonOut(w, nil, out)
  110. }).Methods(http.MethodGet)
  111. // Return same data for 3 times
  112. router.HandleFunc("/data2", func(w http.ResponseWriter, r *http.Request) {
  113. out := &struct {
  114. Code int `json:"code"`
  115. Data struct {
  116. DeviceId string `json:"device_id"`
  117. Temperature float64 `json:"temperature"`
  118. Humidity float64 `json:"humidity"`
  119. } `json:"data"`
  120. }{
  121. Code: 200,
  122. Data: struct {
  123. DeviceId string `json:"device_id"`
  124. Temperature float64 `json:"temperature"`
  125. Humidity float64 `json:"humidity"`
  126. }{
  127. DeviceId: "device" + strconv.Itoa(i/3),
  128. Temperature: 25.5,
  129. Humidity: 60.0,
  130. },
  131. }
  132. i++
  133. jsonOut(w, nil, out)
  134. }).Methods(http.MethodGet)
  135. server := httptest.NewUnstartedServer(router)
  136. server.Listener.Close()
  137. server.Listener = l
  138. return server
  139. }
  140. var wrongPath, _ = filepath.Abs("/tmp/wrong")
  141. // Test configure to properties
  142. func TestConfigure(t *testing.T) {
  143. tests := []struct {
  144. name string
  145. props map[string]interface{}
  146. err error
  147. config *RawConf
  148. accessConf *AccessTokenConf
  149. refreshConf *RefreshTokenConf
  150. tokens map[string]interface{}
  151. }{
  152. {
  153. name: "default",
  154. props: map[string]interface{}{
  155. "incremental": true,
  156. "url": "http://localhost:9090/",
  157. },
  158. config: &RawConf{
  159. Incremental: true,
  160. Url: "http://localhost:9090/",
  161. Method: http.MethodGet,
  162. Interval: DefaultInterval,
  163. Timeout: DefaultTimeout,
  164. BodyType: "none",
  165. ResponseType: "code",
  166. InsecureSkipVerify: true,
  167. },
  168. },
  169. // Test wrong properties
  170. {
  171. name: "wrong props",
  172. props: map[string]interface{}{
  173. "incremental": true,
  174. "url": 123,
  175. },
  176. err: fmt.Errorf("fail to parse the properties: 1 error(s) decoding:\n\n* 'url' expected type 'string', got unconvertible type 'int', value: '123'"),
  177. },
  178. {
  179. name: "empty url",
  180. props: map[string]interface{}{
  181. "incremental": true,
  182. "url": "",
  183. },
  184. err: fmt.Errorf("url is required"),
  185. },
  186. {
  187. name: "wrong method",
  188. props: map[string]interface{}{
  189. "url": "http://localhost:9090/",
  190. "method": "wrong",
  191. },
  192. err: fmt.Errorf("Not supported HTTP method wrong."),
  193. },
  194. {
  195. name: "wrong bodytype",
  196. props: map[string]interface{}{
  197. "url": "http://localhost:9090/",
  198. "bodyType": "wrong",
  199. },
  200. err: fmt.Errorf("Not valid body type value wrong."),
  201. },
  202. {
  203. name: "wrong response type",
  204. props: map[string]interface{}{
  205. "url": "http://localhost:9090/",
  206. "responseType": "wrong",
  207. },
  208. err: fmt.Errorf("Not valid response type value wrong."),
  209. },
  210. {
  211. name: "wrong url",
  212. props: map[string]interface{}{
  213. "url": "http:/localhost:9090/",
  214. },
  215. err: fmt.Errorf("Invalid url, host not found"),
  216. },
  217. {
  218. name: "wrong interval",
  219. props: map[string]interface{}{
  220. "url": "http:/localhost:9090/",
  221. "interval": -2,
  222. },
  223. err: fmt.Errorf("interval must be greater than 0"),
  224. },
  225. {
  226. name: "wrong timeout",
  227. props: map[string]interface{}{
  228. "url": "http:/localhost:9090/",
  229. "timeout": -2,
  230. },
  231. err: fmt.Errorf("timeout must be greater than or equal to 0"),
  232. },
  233. {
  234. name: "wrong tls",
  235. props: map[string]interface{}{
  236. "url": "http://localhost:9090/",
  237. "certificationPath": wrongPath,
  238. },
  239. err: fmt.Errorf(fmt.Sprintf("stat %s: no such file or directory", wrongPath)),
  240. },
  241. // Test oAuth
  242. {
  243. name: "oAuth with access token and constant expire",
  244. props: map[string]interface{}{
  245. "url": "http://localhost:52345/",
  246. "headers": map[string]interface{}{
  247. "Authorization": "Bearer {{.token}}",
  248. },
  249. "oAuth": map[string]interface{}{
  250. "access": map[string]interface{}{
  251. "url": "http://localhost:52345/token",
  252. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  253. "expire": "3600",
  254. },
  255. },
  256. },
  257. config: &RawConf{
  258. Url: "http://localhost:52345/",
  259. Method: http.MethodGet,
  260. Interval: DefaultInterval,
  261. Timeout: DefaultTimeout,
  262. BodyType: "none",
  263. ResponseType: "code",
  264. InsecureSkipVerify: true,
  265. Headers: map[string]interface{}{
  266. "Authorization": "Bearer {{.token}}",
  267. },
  268. HeadersMap: map[string]string{
  269. "Authorization": "Bearer {{.token}}",
  270. },
  271. OAuth: map[string]map[string]interface{}{
  272. "access": {
  273. "url": "http://localhost:52345/token",
  274. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  275. "expire": "3600",
  276. },
  277. },
  278. },
  279. accessConf: &AccessTokenConf{
  280. Url: "http://localhost:52345/token",
  281. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  282. Expire: "3600",
  283. ExpireInSecond: 3600,
  284. },
  285. tokens: map[string]interface{}{
  286. "token": DefaultToken,
  287. "refresh_token": RefreshToken,
  288. "client_id": "test",
  289. "expires": float64(36000),
  290. },
  291. },
  292. {
  293. name: "oAuth with access token and dynamic expire",
  294. props: map[string]interface{}{
  295. "url": "http://localhost:52345/",
  296. "headers": map[string]interface{}{
  297. "Authorization": "Bearer {{.token}}",
  298. },
  299. "oAuth": map[string]interface{}{
  300. "access": map[string]interface{}{
  301. "url": "http://localhost:52345/token",
  302. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  303. "expire": "{{.expires}}",
  304. },
  305. },
  306. },
  307. config: &RawConf{
  308. Url: "http://localhost:52345/",
  309. Method: http.MethodGet,
  310. Interval: DefaultInterval,
  311. Timeout: DefaultTimeout,
  312. BodyType: "none",
  313. ResponseType: "code",
  314. InsecureSkipVerify: true,
  315. Headers: map[string]interface{}{
  316. "Authorization": "Bearer {{.token}}",
  317. },
  318. HeadersMap: map[string]string{
  319. "Authorization": "Bearer {{.token}}",
  320. },
  321. OAuth: map[string]map[string]interface{}{
  322. "access": {
  323. "url": "http://localhost:52345/token",
  324. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  325. "expire": "{{.expires}}",
  326. },
  327. },
  328. },
  329. accessConf: &AccessTokenConf{
  330. Url: "http://localhost:52345/token",
  331. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  332. Expire: "{{.expires}}",
  333. ExpireInSecond: 36000,
  334. },
  335. tokens: map[string]interface{}{
  336. "token": DefaultToken,
  337. "refresh_token": RefreshToken,
  338. "client_id": "test",
  339. "expires": float64(36000),
  340. },
  341. },
  342. {
  343. name: "oAuth with access token and refresh token",
  344. props: map[string]interface{}{
  345. "url": "http://localhost:52345/",
  346. "headers": map[string]interface{}{
  347. "Authorization": "Bearer {{.token}}",
  348. },
  349. "oAuth": map[string]interface{}{
  350. "access": map[string]interface{}{
  351. "url": "http://localhost:52345/token",
  352. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  353. "expire": "3600",
  354. },
  355. "refresh": map[string]interface{}{
  356. "url": "http://localhost:52345/refresh",
  357. "headers": map[string]interface{}{
  358. "Authorization": "Bearer {{.token}}",
  359. "RefreshToken": "{{.refresh_token}}",
  360. },
  361. },
  362. },
  363. },
  364. config: &RawConf{
  365. Url: "http://localhost:52345/",
  366. Method: http.MethodGet,
  367. Interval: DefaultInterval,
  368. Timeout: DefaultTimeout,
  369. BodyType: "none",
  370. ResponseType: "code",
  371. InsecureSkipVerify: true,
  372. Headers: map[string]interface{}{
  373. "Authorization": "Bearer {{.token}}",
  374. },
  375. HeadersMap: map[string]string{
  376. "Authorization": "Bearer {{.token}}",
  377. },
  378. OAuth: map[string]map[string]interface{}{
  379. "access": {
  380. "url": "http://localhost:52345/token",
  381. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  382. "expire": "3600",
  383. },
  384. "refresh": {
  385. "url": "http://localhost:52345/refresh",
  386. "headers": map[string]interface{}{
  387. "Authorization": "Bearer {{.token}}",
  388. "RefreshToken": "{{.refresh_token}}",
  389. },
  390. },
  391. },
  392. },
  393. accessConf: &AccessTokenConf{
  394. Url: "http://localhost:52345/token",
  395. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  396. Expire: "3600",
  397. ExpireInSecond: 3600,
  398. },
  399. refreshConf: &RefreshTokenConf{
  400. Url: "http://localhost:52345/refresh",
  401. Headers: map[string]string{
  402. "Authorization": "Bearer {{.token}}",
  403. "RefreshToken": "{{.refresh_token}}",
  404. },
  405. },
  406. tokens: map[string]interface{}{
  407. "token": DefaultToken,
  408. "refresh_token": RefreshToken,
  409. "client_id": "test",
  410. "expires": float64(36000),
  411. },
  412. },
  413. // Wrong auth configs
  414. {
  415. name: "oAuth wrong access token config",
  416. props: map[string]interface{}{
  417. "url": "http://localhost:52345/",
  418. "headers": map[string]interface{}{
  419. "Authorization": "Bearer {{.token}}",
  420. },
  421. "oAuth": map[string]interface{}{
  422. "access": map[string]interface{}{
  423. "url": "http://localhost:52345/token",
  424. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  425. "expire": 3600,
  426. },
  427. },
  428. },
  429. err: errors.New("fail to parse the access properties of oAuth: 1 error(s) decoding:\n\n* 'expire' expected type 'string', got unconvertible type 'int', value: '3600'"),
  430. },
  431. {
  432. name: "oAuth wrong access url",
  433. props: map[string]interface{}{
  434. "url": "http://localhost:52345/",
  435. "headers": map[string]interface{}{
  436. "Authorization": "Bearer {{.token}}",
  437. },
  438. "oAuth": map[string]interface{}{
  439. "access": map[string]interface{}{
  440. "url": "",
  441. },
  442. },
  443. },
  444. err: errors.New("access token url is required"),
  445. },
  446. {
  447. name: "oAuth miss access",
  448. props: map[string]interface{}{
  449. "url": "http://localhost:52345/",
  450. "headers": map[string]interface{}{
  451. "Authorization": "Bearer {{.token}}",
  452. },
  453. "oAuth": map[string]interface{}{
  454. "refresh": map[string]interface{}{
  455. "url": "http://localhost:52345/",
  456. },
  457. },
  458. },
  459. err: errors.New("if setting oAuth, `access` property is required"),
  460. },
  461. {
  462. name: "oAuth wrong refresh token config",
  463. props: map[string]interface{}{
  464. "url": "http://localhost:52345/",
  465. "headers": map[string]interface{}{
  466. "Authorization": "Bearer {{.token}}",
  467. },
  468. "oAuth": map[string]interface{}{
  469. "access": map[string]interface{}{
  470. "url": "http://localhost:52345/token",
  471. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  472. "expire": "3600",
  473. },
  474. "refresh": map[string]interface{}{
  475. "url": 1202,
  476. },
  477. },
  478. },
  479. accessConf: &AccessTokenConf{
  480. Url: "http://localhost:52345/token",
  481. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  482. Expire: "3600",
  483. ExpireInSecond: 3600,
  484. },
  485. err: errors.New("fail to parse the refresh token properties: 1 error(s) decoding:\n\n* 'url' expected type 'string', got unconvertible type 'int', value: '1202'"),
  486. },
  487. {
  488. name: "oAuth refresh token missing url",
  489. props: map[string]interface{}{
  490. "url": "http://localhost:52345/",
  491. "headers": map[string]interface{}{
  492. "Authorization": "Bearer {{.token}}",
  493. },
  494. "oAuth": map[string]interface{}{
  495. "access": map[string]interface{}{
  496. "url": "http://localhost:52345/token",
  497. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  498. "expire": "3600",
  499. },
  500. "refresh": map[string]interface{}{
  501. "url": "",
  502. },
  503. },
  504. },
  505. accessConf: &AccessTokenConf{
  506. Url: "http://localhost:52345/token",
  507. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  508. Expire: "3600",
  509. ExpireInSecond: 3600,
  510. },
  511. err: errors.New("refresh token url is required"),
  512. },
  513. // oAuth authentication flow errors
  514. {
  515. name: "oAuth auth error",
  516. props: map[string]interface{}{
  517. "url": "http://localhost:52345/",
  518. "headers": map[string]interface{}{
  519. "Authorization": "Bearer {{.token}}",
  520. },
  521. "oAuth": map[string]interface{}{
  522. "access": map[string]interface{}{
  523. "url": "http://localhost:52345/token",
  524. "body": "{\"username\": \"admin\",\"password\": \"1234\"}",
  525. "expire": "3600",
  526. },
  527. },
  528. },
  529. config: &RawConf{
  530. Url: "http://localhost:52345/",
  531. Method: http.MethodGet,
  532. Interval: DefaultInterval,
  533. Timeout: DefaultTimeout,
  534. BodyType: "json",
  535. ResponseType: "code",
  536. InsecureSkipVerify: true,
  537. Headers: map[string]string{
  538. "Authorization": "Bearer {{.token}}",
  539. },
  540. OAuth: map[string]map[string]interface{}{
  541. "access": {
  542. "url": "http://localhost:52345/token",
  543. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  544. "expire": "3600",
  545. },
  546. },
  547. },
  548. accessConf: &AccessTokenConf{
  549. Url: "http://localhost:52345/token",
  550. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  551. Expire: "3600",
  552. ExpireInSecond: 3600,
  553. },
  554. tokens: map[string]interface{}{
  555. "token": DefaultToken,
  556. "refresh_token": RefreshToken,
  557. "client_id": "test",
  558. "expires": float64(36000),
  559. },
  560. err: errors.New("fail to authorize by oAuth: Cannot parse access token response to json: http return code error: 400"),
  561. },
  562. {
  563. name: "oAuth refresh error",
  564. props: map[string]interface{}{
  565. "url": "http://localhost:52345/",
  566. "headers": map[string]interface{}{
  567. "Authorization": "Bearer {{.token}}",
  568. },
  569. "oAuth": map[string]interface{}{
  570. "access": map[string]interface{}{
  571. "url": "http://localhost:52345/token",
  572. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  573. "expire": "3600",
  574. },
  575. "refresh": map[string]interface{}{
  576. "url": "http://localhost:52345/refresh",
  577. "headers": map[string]interface{}{
  578. "Authorization": "Bearer {{.token}}",
  579. "RefreshToken": "{{.token}}",
  580. },
  581. },
  582. },
  583. },
  584. err: errors.New("fail to authorize by oAuth: Cannot parse refresh token response to json: http return code error: 400"),
  585. },
  586. {
  587. name: "oAuth wrong access expire template",
  588. props: map[string]interface{}{
  589. "url": "http://localhost:52345/",
  590. "headers": map[string]interface{}{
  591. "Authorization": "Bearer {{.token}}",
  592. },
  593. "oAuth": map[string]interface{}{
  594. "access": map[string]interface{}{
  595. "url": "http://localhost:52345/token",
  596. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  597. "expire": "{{..expp}}",
  598. },
  599. },
  600. },
  601. err: errors.New("fail to authorize by oAuth: fail to parse the expire time for access token: template: sink:1: unexpected . after term \".\""),
  602. },
  603. {
  604. name: "oAuth wrong access expire type",
  605. props: map[string]interface{}{
  606. "url": "http://localhost:52345/",
  607. "headers": map[string]interface{}{
  608. "Authorization": "Bearer {{.token}}",
  609. },
  610. "oAuth": map[string]interface{}{
  611. "access": map[string]interface{}{
  612. "url": "http://localhost:52345/token",
  613. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  614. "expire": "{{.token}}",
  615. },
  616. },
  617. },
  618. err: errors.New("fail to authorize by oAuth: fail to covert the expire time privatisation for access token: cannot convert string(privatisation) to int"),
  619. },
  620. {
  621. name: "oAuth wrong access url",
  622. props: map[string]interface{}{
  623. "url": "http://localhost:52345/",
  624. "headers": map[string]interface{}{
  625. "Authorization": "Bearer {{.token}}",
  626. },
  627. "oAuth": map[string]interface{}{
  628. "access": map[string]interface{}{
  629. "url": "http:localhost:52345/token",
  630. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  631. "expire": "{{.token}}",
  632. },
  633. },
  634. },
  635. err: errors.New("fail to authorize by oAuth: fail to get access token: Post \"http:localhost:52345/token\": http: no Host in request URL"),
  636. },
  637. {
  638. name: "oAuth wrong refresh header template",
  639. props: map[string]interface{}{
  640. "url": "http://localhost:52345/",
  641. "headers": map[string]interface{}{
  642. "Authorization": "Bearer {{.token}}",
  643. },
  644. "oAuth": map[string]interface{}{
  645. "access": map[string]interface{}{
  646. "url": "http://localhost:52345/token",
  647. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  648. "expire": "3600",
  649. },
  650. "refresh": map[string]interface{}{
  651. "url": "http://localhost:52345/refresh",
  652. "headers": map[string]interface{}{
  653. "Authorization": "Bearer {{.token}}",
  654. "RefreshToken": "{{..token}}",
  655. },
  656. },
  657. },
  658. },
  659. err: errors.New("fail to authorize by oAuth: fail to parse the header for refresh token request RefreshToken: template: sink:1: unexpected . after term \".\""),
  660. },
  661. {
  662. name: "oAuth wrong refresh url",
  663. props: map[string]interface{}{
  664. "url": "http://localhost:52345/",
  665. "headers": map[string]interface{}{
  666. "Authorization": "Bearer {{.token}}",
  667. },
  668. "oAuth": map[string]interface{}{
  669. "access": map[string]interface{}{
  670. "url": "http://localhost:52345/token",
  671. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  672. "expire": "3600",
  673. },
  674. "refresh": map[string]interface{}{
  675. "url": "http:localhost:52345/refresh2",
  676. "headers": map[string]interface{}{
  677. "Authorization": "Bearer {{.token}}",
  678. "RefreshToken": "{{.token}}",
  679. },
  680. },
  681. },
  682. },
  683. err: errors.New("fail to authorize by oAuth: fail to get refresh token: Post \"http:localhost:52345/refresh2\": http: no Host in request URL"),
  684. },
  685. }
  686. server := mockAuthServer()
  687. server.Start()
  688. defer server.Close()
  689. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  690. for i, tt := range tests {
  691. t.Run(fmt.Sprintf("Test %d: %s", i, tt.name), func(t *testing.T) {
  692. r := &PullSource{}
  693. err := r.Configure("", tt.props)
  694. if err != nil {
  695. if tt.err == nil {
  696. t.Errorf("Expected error: %v", err)
  697. } else {
  698. if err.Error() != tt.err.Error() {
  699. t.Errorf("Error mismatch\nexp\t%v\ngot\t%v", tt.err, err)
  700. }
  701. }
  702. return
  703. }
  704. if !reflect.DeepEqual(r.config, tt.config) {
  705. t.Errorf("Config mismatch\nexp\t%+v\ngot\t%+v", tt.config, r.config)
  706. }
  707. if !reflect.DeepEqual(r.accessConf, tt.accessConf) {
  708. t.Errorf("AccessConf mismatch\nexp\t%+v\ngot\t%+v", tt.accessConf, r.accessConf)
  709. }
  710. if !reflect.DeepEqual(r.refreshConf, tt.refreshConf) {
  711. t.Errorf("RefreshConf mismatch\nexp\t%+v\ngot\t%+v", tt.refreshConf, r.refreshConf)
  712. }
  713. if !reflect.DeepEqual(r.tokens, tt.tokens) {
  714. t.Errorf("Tokens mismatch\nexp\t%s\ngot\t%s", tt.tokens, r.tokens)
  715. }
  716. })
  717. }
  718. }
  719. func TestPullWithAuth(t *testing.T) {
  720. r := &PullSource{}
  721. server := mockAuthServer()
  722. server.Start()
  723. defer server.Close()
  724. err := r.Configure("data", map[string]interface{}{
  725. "url": "http://localhost:52345/",
  726. "interval": 100,
  727. "headers": map[string]interface{}{
  728. "Authorization": "Bearer {{.token}}",
  729. },
  730. "oAuth": map[string]interface{}{
  731. "access": map[string]interface{}{
  732. "url": "http://localhost:52345/token",
  733. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  734. "expire": "10",
  735. },
  736. "refresh": map[string]interface{}{
  737. "url": "http://localhost:52345/refresh",
  738. "headers": map[string]interface{}{
  739. "Authorization": "Bearer {{.token}}",
  740. "RefreshToken": "{{.refresh_token}}",
  741. },
  742. },
  743. },
  744. })
  745. if err != nil {
  746. t.Errorf(err.Error())
  747. return
  748. }
  749. exp := []api.SourceTuple{
  750. api.NewDefaultSourceTuple(map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}, map[string]interface{}{}),
  751. }
  752. mock.TestSourceOpen(r, exp, t)
  753. }
  754. func TestPullIncremental(t *testing.T) {
  755. r := &PullSource{}
  756. server := mockAuthServer()
  757. server.Start()
  758. defer server.Close()
  759. err := r.Configure("data2", map[string]interface{}{
  760. "url": "http://localhost:52345/",
  761. "interval": 100,
  762. "incremental": true,
  763. "responseType": "body",
  764. })
  765. if err != nil {
  766. t.Errorf(err.Error())
  767. return
  768. }
  769. exp := []api.SourceTuple{
  770. api.NewDefaultSourceTuple(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device0", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}),
  771. api.NewDefaultSourceTuple(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}),
  772. api.NewDefaultSourceTuple(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device2", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}),
  773. }
  774. mock.TestSourceOpen(r, exp, t)
  775. }