httppull_source_test.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package http
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "github.com/gorilla/mux"
  20. "github.com/lf-edge/ekuiper/internal/io/mock"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "net"
  23. "net/http"
  24. "net/http/httptest"
  25. "path/filepath"
  26. "reflect"
  27. "strconv"
  28. "testing"
  29. )
  30. func jsonOut(w http.ResponseWriter, err error, out interface{}) {
  31. w.Header().Add("Content-Type", "application/json")
  32. enc := json.NewEncoder(w)
  33. err = enc.Encode(out)
  34. // Problems encoding
  35. if err != nil {
  36. http.Error(w, err.Error(), http.StatusBadRequest)
  37. }
  38. }
  39. const (
  40. DefaultToken = "privatisation"
  41. RefreshToken = "privaterefresh"
  42. )
  43. // mock http auth server
  44. func mockAuthServer() *httptest.Server {
  45. l, _ := net.Listen("tcp", "127.0.0.1:52345")
  46. router := mux.NewRouter()
  47. i := 0
  48. router.HandleFunc("/token", func(w http.ResponseWriter, r *http.Request) {
  49. body := &struct {
  50. Username string `json:"username"`
  51. Password string `json:"password"`
  52. }{}
  53. err := json.NewDecoder(r.Body).Decode(body)
  54. if err != nil {
  55. http.Error(w, err.Error(), http.StatusBadRequest)
  56. }
  57. if body.Username != "admin" || body.Password != "0000" {
  58. http.Error(w, "invalid username or password", http.StatusBadRequest)
  59. }
  60. out := &struct {
  61. Token string `json:"token"`
  62. RefreshToken string `json:"refresh_token"`
  63. ClientId string `json:"client_id"`
  64. Expires int64 `json:"expires"`
  65. }{
  66. Token: DefaultToken,
  67. RefreshToken: RefreshToken,
  68. ClientId: "test",
  69. Expires: 36000,
  70. }
  71. jsonOut(w, err, out)
  72. }).Methods(http.MethodPost)
  73. router.HandleFunc("/refresh", func(w http.ResponseWriter, r *http.Request) {
  74. token := r.Header.Get("Authorization")
  75. if token != "Bearer "+DefaultToken {
  76. http.Error(w, "invalid token", http.StatusBadRequest)
  77. }
  78. rt := r.Header.Get("RefreshToken")
  79. if rt != RefreshToken {
  80. http.Error(w, "invalid refresh token", http.StatusBadRequest)
  81. }
  82. out := &struct {
  83. Token string `json:"token"`
  84. RefreshToken string `json:"refresh_token"`
  85. ClientId string `json:"client_id"`
  86. Expires int64 `json:"expires"`
  87. }{
  88. Token: DefaultToken,
  89. RefreshToken: RefreshToken,
  90. ClientId: "test",
  91. Expires: 36000,
  92. }
  93. jsonOut(w, nil, out)
  94. }).Methods(http.MethodPost)
  95. router.HandleFunc("/data", func(w http.ResponseWriter, r *http.Request) {
  96. token := r.Header.Get("Authorization")
  97. if token != "Bearer "+DefaultToken {
  98. http.Error(w, "invalid token", http.StatusBadRequest)
  99. }
  100. out := &struct {
  101. DeviceId string `json:"device_id"`
  102. Temperature float64 `json:"temperature"`
  103. Humidity float64 `json:"humidity"`
  104. }{
  105. DeviceId: "device1",
  106. Temperature: 25.5,
  107. Humidity: 60.0,
  108. }
  109. jsonOut(w, nil, out)
  110. }).Methods(http.MethodGet)
  111. // Return same data for 3 times
  112. router.HandleFunc("/data2", func(w http.ResponseWriter, r *http.Request) {
  113. out := &struct {
  114. Code int `json:"code"`
  115. Data struct {
  116. DeviceId string `json:"device_id"`
  117. Temperature float64 `json:"temperature"`
  118. Humidity float64 `json:"humidity"`
  119. } `json:"data"`
  120. }{
  121. Code: 200,
  122. Data: struct {
  123. DeviceId string `json:"device_id"`
  124. Temperature float64 `json:"temperature"`
  125. Humidity float64 `json:"humidity"`
  126. }{
  127. DeviceId: "device" + strconv.Itoa(i/3),
  128. Temperature: 25.5,
  129. Humidity: 60.0,
  130. },
  131. }
  132. i++
  133. jsonOut(w, nil, out)
  134. }).Methods(http.MethodGet)
  135. server := httptest.NewUnstartedServer(router)
  136. server.Listener.Close()
  137. server.Listener = l
  138. return server
  139. }
  140. var wrongPath, _ = filepath.Abs("/tmp/wrong")
  141. // Test configure to properties
  142. func TestConfigure(t *testing.T) {
  143. tests := []struct {
  144. name string
  145. props map[string]interface{}
  146. err error
  147. config *RawConf
  148. accessConf *AccessTokenConf
  149. refreshConf *RefreshTokenConf
  150. tokens map[string]interface{}
  151. }{
  152. {
  153. name: "default",
  154. props: map[string]interface{}{
  155. "incremental": true,
  156. "url": "http://localhost:9090/",
  157. },
  158. config: &RawConf{
  159. Incremental: true,
  160. Url: "http://localhost:9090/",
  161. Method: http.MethodGet,
  162. Interval: DefaultInterval,
  163. Timeout: DefaultTimeout,
  164. BodyType: "none",
  165. ResponseType: "code",
  166. InsecureSkipVerify: true,
  167. },
  168. },
  169. // Test wrong properties
  170. {
  171. name: "wrong props",
  172. props: map[string]interface{}{
  173. "incremental": true,
  174. "url": 123,
  175. },
  176. err: fmt.Errorf("fail to parse the properties: 1 error(s) decoding:\n\n* 'url' expected type 'string', got unconvertible type 'int', value: '123'"),
  177. },
  178. {
  179. name: "empty url",
  180. props: map[string]interface{}{
  181. "incremental": true,
  182. "url": "",
  183. },
  184. err: fmt.Errorf("url is required"),
  185. },
  186. {
  187. name: "wrong method",
  188. props: map[string]interface{}{
  189. "url": "http://localhost:9090/",
  190. "method": "wrong",
  191. },
  192. err: fmt.Errorf("Not supported HTTP method wrong."),
  193. },
  194. {
  195. name: "wrong bodytype",
  196. props: map[string]interface{}{
  197. "url": "http://localhost:9090/",
  198. "bodyType": "wrong",
  199. },
  200. err: fmt.Errorf("Not valid body type value wrong."),
  201. },
  202. {
  203. name: "wrong response type",
  204. props: map[string]interface{}{
  205. "url": "http://localhost:9090/",
  206. "responseType": "wrong",
  207. },
  208. err: fmt.Errorf("Not valid response type value wrong."),
  209. },
  210. {
  211. name: "wrong url",
  212. props: map[string]interface{}{
  213. "url": "http:/localhost:9090/",
  214. },
  215. err: fmt.Errorf("Invalid url, host not found"),
  216. },
  217. {
  218. name: "wrong interval",
  219. props: map[string]interface{}{
  220. "url": "http:/localhost:9090/",
  221. "interval": -2,
  222. },
  223. err: fmt.Errorf("interval must be greater than 0"),
  224. },
  225. {
  226. name: "wrong timeout",
  227. props: map[string]interface{}{
  228. "url": "http:/localhost:9090/",
  229. "timeout": -2,
  230. },
  231. err: fmt.Errorf("timeout must be greater than or equal to 0"),
  232. },
  233. {
  234. name: "wrong tls",
  235. props: map[string]interface{}{
  236. "url": "http://localhost:9090/",
  237. "certificationPath": wrongPath,
  238. },
  239. err: fmt.Errorf(fmt.Sprintf("stat %s: no such file or directory", wrongPath)),
  240. },
  241. // Test oAuth
  242. {
  243. name: "oAuth with access token and constant expire",
  244. props: map[string]interface{}{
  245. "url": "http://localhost:52345/",
  246. "headers": map[string]interface{}{
  247. "Authorization": "Bearer {{.token}}",
  248. },
  249. "oAuth": map[string]interface{}{
  250. "access": map[string]interface{}{
  251. "url": "http://localhost:52345/token",
  252. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  253. "expire": "3600",
  254. },
  255. },
  256. },
  257. config: &RawConf{
  258. Url: "http://localhost:52345/",
  259. Method: http.MethodGet,
  260. Interval: DefaultInterval,
  261. Timeout: DefaultTimeout,
  262. BodyType: "none",
  263. ResponseType: "code",
  264. InsecureSkipVerify: true,
  265. Headers: map[string]interface{}{
  266. "Authorization": "Bearer {{.token}}",
  267. },
  268. HeadersMap: map[string]string{
  269. "Authorization": "Bearer {{.token}}",
  270. },
  271. OAuth: map[string]map[string]interface{}{
  272. "access": {
  273. "url": "http://localhost:52345/token",
  274. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  275. "expire": "3600",
  276. },
  277. },
  278. },
  279. accessConf: &AccessTokenConf{
  280. Url: "http://localhost:52345/token",
  281. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  282. Expire: "3600",
  283. ExpireInSecond: 3600,
  284. },
  285. tokens: map[string]interface{}{
  286. "token": DefaultToken,
  287. "refresh_token": RefreshToken,
  288. "client_id": "test",
  289. "expires": float64(36000),
  290. },
  291. },
  292. {
  293. name: "oAuth with access token and dynamic expire",
  294. props: map[string]interface{}{
  295. "url": "http://localhost:52345/",
  296. "headers": map[string]interface{}{
  297. "Authorization": "Bearer {{.token}}",
  298. },
  299. "oAuth": map[string]interface{}{
  300. "access": map[string]interface{}{
  301. "url": "http://localhost:52345/token",
  302. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  303. "expire": "{{.expires}}",
  304. },
  305. },
  306. },
  307. config: &RawConf{
  308. Url: "http://localhost:52345/",
  309. Method: http.MethodGet,
  310. Interval: DefaultInterval,
  311. Timeout: DefaultTimeout,
  312. BodyType: "none",
  313. ResponseType: "code",
  314. InsecureSkipVerify: true,
  315. Headers: map[string]interface{}{
  316. "Authorization": "Bearer {{.token}}",
  317. },
  318. HeadersMap: map[string]string{
  319. "Authorization": "Bearer {{.token}}",
  320. },
  321. OAuth: map[string]map[string]interface{}{
  322. "access": {
  323. "url": "http://localhost:52345/token",
  324. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  325. "expire": "{{.expires}}",
  326. },
  327. },
  328. },
  329. accessConf: &AccessTokenConf{
  330. Url: "http://localhost:52345/token",
  331. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  332. Expire: "{{.expires}}",
  333. ExpireInSecond: 36000,
  334. },
  335. tokens: map[string]interface{}{
  336. "token": DefaultToken,
  337. "refresh_token": RefreshToken,
  338. "client_id": "test",
  339. "expires": float64(36000),
  340. },
  341. },
  342. {
  343. name: "oAuth with access token and refresh token",
  344. props: map[string]interface{}{
  345. "url": "http://localhost:52345/",
  346. "headers": map[string]interface{}{
  347. "Authorization": "Bearer {{.token}}",
  348. },
  349. "oAuth": map[string]interface{}{
  350. "access": map[string]interface{}{
  351. "url": "http://localhost:52345/token",
  352. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  353. "expire": "3600",
  354. },
  355. "refresh": map[string]interface{}{
  356. "url": "http://localhost:52345/refresh",
  357. "headers": map[string]interface{}{
  358. "Authorization": "Bearer {{.token}}",
  359. "RefreshToken": "{{.refresh_token}}",
  360. },
  361. },
  362. },
  363. },
  364. config: &RawConf{
  365. Url: "http://localhost:52345/",
  366. Method: http.MethodGet,
  367. Interval: DefaultInterval,
  368. Timeout: DefaultTimeout,
  369. BodyType: "none",
  370. ResponseType: "code",
  371. InsecureSkipVerify: true,
  372. Headers: map[string]interface{}{
  373. "Authorization": "Bearer {{.token}}",
  374. },
  375. HeadersMap: map[string]string{
  376. "Authorization": "Bearer {{.token}}",
  377. },
  378. OAuth: map[string]map[string]interface{}{
  379. "access": {
  380. "url": "http://localhost:52345/token",
  381. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  382. "expire": "3600",
  383. },
  384. "refresh": {
  385. "url": "http://localhost:52345/refresh",
  386. "headers": map[string]interface{}{
  387. "Authorization": "Bearer {{.token}}",
  388. "RefreshToken": "{{.refresh_token}}",
  389. },
  390. },
  391. },
  392. },
  393. accessConf: &AccessTokenConf{
  394. Url: "http://localhost:52345/token",
  395. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  396. Expire: "3600",
  397. ExpireInSecond: 3600,
  398. },
  399. refreshConf: &RefreshTokenConf{
  400. Url: "http://localhost:52345/refresh",
  401. Headers: map[string]string{
  402. "Authorization": "Bearer {{.token}}",
  403. "RefreshToken": "{{.refresh_token}}",
  404. },
  405. },
  406. tokens: map[string]interface{}{
  407. "token": DefaultToken,
  408. "refresh_token": RefreshToken,
  409. "client_id": "test",
  410. "expires": float64(36000),
  411. },
  412. },
  413. // Wrong auth configs
  414. {
  415. name: "oAuth wrong access token config",
  416. props: map[string]interface{}{
  417. "url": "http://localhost:52345/",
  418. "headers": map[string]interface{}{
  419. "Authorization": "Bearer {{.token}}",
  420. },
  421. "oAuth": map[string]interface{}{
  422. "access": map[string]interface{}{
  423. "url": "http://localhost:52345/token",
  424. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  425. "expire": 3600,
  426. },
  427. },
  428. },
  429. err: errors.New("fail to parse the access properties of oAuth: 1 error(s) decoding:\n\n* 'expire' expected type 'string', got unconvertible type 'int', value: '3600'"),
  430. },
  431. {
  432. name: "oAuth wrong access url",
  433. props: map[string]interface{}{
  434. "url": "http://localhost:52345/",
  435. "headers": map[string]interface{}{
  436. "Authorization": "Bearer {{.token}}",
  437. },
  438. "oAuth": map[string]interface{}{
  439. "access": map[string]interface{}{
  440. "url": "",
  441. },
  442. },
  443. },
  444. config: &RawConf{
  445. Url: "http://localhost:52345/",
  446. Method: http.MethodGet,
  447. Interval: DefaultInterval,
  448. Timeout: DefaultTimeout,
  449. BodyType: "none",
  450. ResponseType: "code",
  451. InsecureSkipVerify: true,
  452. Headers: map[string]interface{}{
  453. "Authorization": "Bearer {{.token}}",
  454. },
  455. HeadersMap: map[string]string{
  456. "Authorization": "Bearer {{.token}}",
  457. },
  458. },
  459. },
  460. {
  461. name: "oAuth miss access",
  462. props: map[string]interface{}{
  463. "url": "http://localhost:52345/",
  464. "headers": map[string]interface{}{
  465. "Authorization": "Bearer {{.token}}",
  466. },
  467. "oAuth": map[string]interface{}{
  468. "refresh": map[string]interface{}{
  469. "url": "http://localhost:52345/",
  470. },
  471. },
  472. },
  473. err: errors.New("if setting oAuth, `access` property is required"),
  474. },
  475. {
  476. name: "oAuth wrong refresh token config",
  477. props: map[string]interface{}{
  478. "url": "http://localhost:52345/",
  479. "headers": map[string]interface{}{
  480. "Authorization": "Bearer {{.token}}",
  481. },
  482. "oAuth": map[string]interface{}{
  483. "access": map[string]interface{}{
  484. "url": "http://localhost:52345/token",
  485. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  486. "expire": "3600",
  487. },
  488. "refresh": map[string]interface{}{
  489. "url": 1202,
  490. },
  491. },
  492. },
  493. accessConf: &AccessTokenConf{
  494. Url: "http://localhost:52345/token",
  495. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  496. Expire: "3600",
  497. ExpireInSecond: 3600,
  498. },
  499. err: errors.New("fail to parse the refresh token properties: 1 error(s) decoding:\n\n* 'url' expected type 'string', got unconvertible type 'int', value: '1202'"),
  500. },
  501. {
  502. name: "oAuth refresh token missing url",
  503. props: map[string]interface{}{
  504. "url": "http://localhost:52345/",
  505. "headers": map[string]interface{}{
  506. "Authorization": "Bearer {{.token}}",
  507. },
  508. "oAuth": map[string]interface{}{
  509. "access": map[string]interface{}{
  510. "url": "http://localhost:52345/token",
  511. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  512. "expire": "3600",
  513. },
  514. },
  515. },
  516. config: &RawConf{
  517. Url: "http://localhost:52345/",
  518. Method: http.MethodGet,
  519. Interval: DefaultInterval,
  520. Timeout: DefaultTimeout,
  521. BodyType: "none",
  522. ResponseType: "code",
  523. InsecureSkipVerify: true,
  524. Headers: map[string]interface{}{
  525. "Authorization": "Bearer {{.token}}",
  526. },
  527. HeadersMap: map[string]string{
  528. "Authorization": "Bearer {{.token}}",
  529. },
  530. OAuth: map[string]map[string]interface{}{
  531. "access": {
  532. "url": "http://localhost:52345/token",
  533. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  534. "expire": "3600",
  535. },
  536. },
  537. },
  538. accessConf: &AccessTokenConf{
  539. Url: "http://localhost:52345/token",
  540. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  541. Expire: "3600",
  542. ExpireInSecond: 3600,
  543. },
  544. tokens: map[string]interface{}{
  545. "token": DefaultToken,
  546. "refresh_token": RefreshToken,
  547. "client_id": "test",
  548. "expires": float64(36000),
  549. },
  550. },
  551. // oAuth authentication flow errors
  552. {
  553. name: "oAuth auth error",
  554. props: map[string]interface{}{
  555. "url": "http://localhost:52345/",
  556. "headers": map[string]interface{}{
  557. "Authorization": "Bearer {{.token}}",
  558. },
  559. "oAuth": map[string]interface{}{
  560. "access": map[string]interface{}{
  561. "url": "http://localhost:52345/token",
  562. "body": "{\"username\": \"admin\",\"password\": \"1234\"}",
  563. "expire": "3600",
  564. },
  565. },
  566. },
  567. config: &RawConf{
  568. Url: "http://localhost:52345/",
  569. Method: http.MethodGet,
  570. Interval: DefaultInterval,
  571. Timeout: DefaultTimeout,
  572. BodyType: "json",
  573. ResponseType: "code",
  574. InsecureSkipVerify: true,
  575. Headers: map[string]string{
  576. "Authorization": "Bearer {{.token}}",
  577. },
  578. OAuth: map[string]map[string]interface{}{
  579. "access": {
  580. "url": "http://localhost:52345/token",
  581. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  582. "expire": "3600",
  583. },
  584. },
  585. },
  586. accessConf: &AccessTokenConf{
  587. Url: "http://localhost:52345/token",
  588. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  589. Expire: "3600",
  590. ExpireInSecond: 3600,
  591. },
  592. tokens: map[string]interface{}{
  593. "token": DefaultToken,
  594. "refresh_token": RefreshToken,
  595. "client_id": "test",
  596. "expires": float64(36000),
  597. },
  598. err: errors.New("fail to authorize by oAuth: Cannot parse access token response to json: http return code error: 400"),
  599. },
  600. {
  601. name: "oAuth refresh error",
  602. props: map[string]interface{}{
  603. "url": "http://localhost:52345/",
  604. "headers": map[string]interface{}{
  605. "Authorization": "Bearer {{.token}}",
  606. },
  607. "oAuth": map[string]interface{}{
  608. "access": map[string]interface{}{
  609. "url": "http://localhost:52345/token",
  610. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  611. "expire": "3600",
  612. },
  613. "refresh": map[string]interface{}{
  614. "url": "http://localhost:52345/refresh",
  615. "headers": map[string]interface{}{
  616. "Authorization": "Bearer {{.token}}",
  617. "RefreshToken": "{{.token}}",
  618. },
  619. },
  620. },
  621. },
  622. err: errors.New("fail to authorize by oAuth: Cannot parse refresh token response to json: http return code error: 400"),
  623. },
  624. {
  625. name: "oAuth wrong access expire template",
  626. props: map[string]interface{}{
  627. "url": "http://localhost:52345/",
  628. "headers": map[string]interface{}{
  629. "Authorization": "Bearer {{.token}}",
  630. },
  631. "oAuth": map[string]interface{}{
  632. "access": map[string]interface{}{
  633. "url": "http://localhost:52345/token",
  634. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  635. "expire": "{{..expp}}",
  636. },
  637. },
  638. },
  639. err: errors.New("fail to authorize by oAuth: fail to parse the expire time for access token: template: sink:1: unexpected . after term \".\""),
  640. },
  641. {
  642. name: "oAuth wrong access expire type",
  643. props: map[string]interface{}{
  644. "url": "http://localhost:52345/",
  645. "headers": map[string]interface{}{
  646. "Authorization": "Bearer {{.token}}",
  647. },
  648. "oAuth": map[string]interface{}{
  649. "access": map[string]interface{}{
  650. "url": "http://localhost:52345/token",
  651. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  652. "expire": "{{.token}}",
  653. },
  654. },
  655. },
  656. err: errors.New("fail to authorize by oAuth: fail to covert the expire time privatisation for access token: cannot convert string(privatisation) to int"),
  657. },
  658. {
  659. name: "oAuth wrong access url",
  660. props: map[string]interface{}{
  661. "url": "http://localhost:52345/",
  662. "headers": map[string]interface{}{
  663. "Authorization": "Bearer {{.token}}",
  664. },
  665. "oAuth": map[string]interface{}{
  666. "access": map[string]interface{}{
  667. "url": "http:localhost:52345/token",
  668. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  669. "expire": "{{.token}}",
  670. },
  671. },
  672. },
  673. err: errors.New("fail to authorize by oAuth: fail to get access token: Post \"http:localhost:52345/token\": http: no Host in request URL"),
  674. },
  675. {
  676. name: "oAuth wrong refresh header template",
  677. props: map[string]interface{}{
  678. "url": "http://localhost:52345/",
  679. "headers": map[string]interface{}{
  680. "Authorization": "Bearer {{.token}}",
  681. },
  682. "oAuth": map[string]interface{}{
  683. "access": map[string]interface{}{
  684. "url": "http://localhost:52345/token",
  685. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  686. "expire": "3600",
  687. },
  688. "refresh": map[string]interface{}{
  689. "url": "http://localhost:52345/refresh",
  690. "headers": map[string]interface{}{
  691. "Authorization": "Bearer {{.token}}",
  692. "RefreshToken": "{{..token}}",
  693. },
  694. },
  695. },
  696. },
  697. err: errors.New("fail to authorize by oAuth: fail to parse the header for refresh token request RefreshToken: template: sink:1: unexpected . after term \".\""),
  698. },
  699. {
  700. name: "oAuth wrong refresh url",
  701. props: map[string]interface{}{
  702. "url": "http://localhost:52345/",
  703. "headers": map[string]interface{}{
  704. "Authorization": "Bearer {{.token}}",
  705. },
  706. "oAuth": map[string]interface{}{
  707. "access": map[string]interface{}{
  708. "url": "http://localhost:52345/token",
  709. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  710. "expire": "3600",
  711. },
  712. "refresh": map[string]interface{}{
  713. "url": "http:localhost:52345/refresh2",
  714. "headers": map[string]interface{}{
  715. "Authorization": "Bearer {{.token}}",
  716. "RefreshToken": "{{.token}}",
  717. },
  718. },
  719. },
  720. },
  721. err: errors.New("fail to authorize by oAuth: fail to get refresh token: Post \"http:localhost:52345/refresh2\": http: no Host in request URL"),
  722. },
  723. }
  724. server := mockAuthServer()
  725. server.Start()
  726. defer server.Close()
  727. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  728. for i, tt := range tests {
  729. t.Run(fmt.Sprintf("Test %d: %s", i, tt.name), func(t *testing.T) {
  730. r := &PullSource{}
  731. err := r.Configure("", tt.props)
  732. if err != nil {
  733. if tt.err == nil {
  734. t.Errorf("Expected error: %v", err)
  735. } else {
  736. if err.Error() != tt.err.Error() {
  737. t.Errorf("Error mismatch\nexp\t%v\ngot\t%v", tt.err, err)
  738. }
  739. }
  740. return
  741. }
  742. if !reflect.DeepEqual(r.config, tt.config) {
  743. t.Errorf("Config mismatch\nexp\t%+v\ngot\t%+v", tt.config, r.config)
  744. }
  745. if !reflect.DeepEqual(r.accessConf, tt.accessConf) {
  746. t.Errorf("AccessConf mismatch\nexp\t%+v\ngot\t%+v", tt.accessConf, r.accessConf)
  747. }
  748. if !reflect.DeepEqual(r.refreshConf, tt.refreshConf) {
  749. t.Errorf("RefreshConf mismatch\nexp\t%+v\ngot\t%+v", tt.refreshConf, r.refreshConf)
  750. }
  751. if !reflect.DeepEqual(r.tokens, tt.tokens) {
  752. t.Errorf("Tokens mismatch\nexp\t%s\ngot\t%s", tt.tokens, r.tokens)
  753. }
  754. })
  755. }
  756. }
  757. func TestPullWithAuth(t *testing.T) {
  758. r := &PullSource{}
  759. server := mockAuthServer()
  760. server.Start()
  761. defer server.Close()
  762. err := r.Configure("data", map[string]interface{}{
  763. "url": "http://localhost:52345/",
  764. "interval": 100,
  765. "headers": map[string]interface{}{
  766. "Authorization": "Bearer {{.token}}",
  767. },
  768. "oAuth": map[string]interface{}{
  769. "access": map[string]interface{}{
  770. "url": "http://localhost:52345/token",
  771. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  772. "expire": "10",
  773. },
  774. "refresh": map[string]interface{}{
  775. "url": "http://localhost:52345/refresh",
  776. "headers": map[string]interface{}{
  777. "Authorization": "Bearer {{.token}}",
  778. "RefreshToken": "{{.refresh_token}}",
  779. },
  780. },
  781. },
  782. })
  783. if err != nil {
  784. t.Errorf(err.Error())
  785. return
  786. }
  787. exp := []api.SourceTuple{
  788. api.NewDefaultSourceTuple(map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}, map[string]interface{}{}),
  789. }
  790. mock.TestSourceOpen(r, exp, t)
  791. }
  792. func TestPullIncremental(t *testing.T) {
  793. r := &PullSource{}
  794. server := mockAuthServer()
  795. server.Start()
  796. defer server.Close()
  797. err := r.Configure("data2", map[string]interface{}{
  798. "url": "http://localhost:52345/",
  799. "interval": 100,
  800. "incremental": true,
  801. "responseType": "body",
  802. })
  803. if err != nil {
  804. t.Errorf(err.Error())
  805. return
  806. }
  807. exp := []api.SourceTuple{
  808. api.NewDefaultSourceTuple(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device0", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}),
  809. api.NewDefaultSourceTuple(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}),
  810. api.NewDefaultSourceTuple(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device2", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}),
  811. }
  812. mock.TestSourceOpen(r, exp, t)
  813. }