httppull_source_test.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package http
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "github.com/benbjohnson/clock"
  20. "github.com/gorilla/mux"
  21. "github.com/lf-edge/ekuiper/internal/conf"
  22. "github.com/lf-edge/ekuiper/internal/io/mock"
  23. "github.com/lf-edge/ekuiper/pkg/api"
  24. "net"
  25. "net/http"
  26. "net/http/httptest"
  27. "path/filepath"
  28. "reflect"
  29. "strconv"
  30. "testing"
  31. )
  32. func jsonOut(w http.ResponseWriter, err error, out interface{}) {
  33. w.Header().Add("Content-Type", "application/json")
  34. enc := json.NewEncoder(w)
  35. err = enc.Encode(out)
  36. // Problems encoding
  37. if err != nil {
  38. http.Error(w, err.Error(), http.StatusBadRequest)
  39. }
  40. }
  41. const (
  42. DefaultToken = "privatisation"
  43. RefreshToken = "privaterefresh"
  44. )
  45. // mock http auth server
  46. func mockAuthServer() *httptest.Server {
  47. l, _ := net.Listen("tcp", "127.0.0.1:52345")
  48. router := mux.NewRouter()
  49. i := 0
  50. router.HandleFunc("/token", func(w http.ResponseWriter, r *http.Request) {
  51. body := &struct {
  52. Username string `json:"username"`
  53. Password string `json:"password"`
  54. }{}
  55. err := json.NewDecoder(r.Body).Decode(body)
  56. if err != nil {
  57. http.Error(w, err.Error(), http.StatusBadRequest)
  58. }
  59. if body.Username != "admin" || body.Password != "0000" {
  60. http.Error(w, "invalid username or password", http.StatusBadRequest)
  61. }
  62. out := &struct {
  63. Token string `json:"token"`
  64. RefreshToken string `json:"refresh_token"`
  65. ClientId string `json:"client_id"`
  66. Expires int64 `json:"expires"`
  67. }{
  68. Token: DefaultToken,
  69. RefreshToken: RefreshToken,
  70. ClientId: "test",
  71. Expires: 36000,
  72. }
  73. jsonOut(w, err, out)
  74. }).Methods(http.MethodPost)
  75. router.HandleFunc("/refresh", func(w http.ResponseWriter, r *http.Request) {
  76. token := r.Header.Get("Authorization")
  77. if token != "Bearer "+DefaultToken {
  78. http.Error(w, "invalid token", http.StatusBadRequest)
  79. }
  80. rt := r.Header.Get("RefreshToken")
  81. if rt != RefreshToken {
  82. http.Error(w, "invalid refresh token", http.StatusBadRequest)
  83. }
  84. out := &struct {
  85. Token string `json:"token"`
  86. RefreshToken string `json:"refresh_token"`
  87. ClientId string `json:"client_id"`
  88. Expires int64 `json:"expires"`
  89. }{
  90. Token: DefaultToken,
  91. RefreshToken: RefreshToken,
  92. ClientId: "test",
  93. Expires: 36000,
  94. }
  95. jsonOut(w, nil, out)
  96. }).Methods(http.MethodPost)
  97. router.HandleFunc("/data", func(w http.ResponseWriter, r *http.Request) {
  98. token := r.Header.Get("Authorization")
  99. if token != "Bearer "+DefaultToken {
  100. http.Error(w, "invalid token", http.StatusBadRequest)
  101. }
  102. out := &struct {
  103. DeviceId string `json:"device_id"`
  104. Temperature float64 `json:"temperature"`
  105. Humidity float64 `json:"humidity"`
  106. }{
  107. DeviceId: "device1",
  108. Temperature: 25.5,
  109. Humidity: 60.0,
  110. }
  111. jsonOut(w, nil, out)
  112. }).Methods(http.MethodGet)
  113. // Return same data for 3 times
  114. router.HandleFunc("/data2", func(w http.ResponseWriter, r *http.Request) {
  115. out := &struct {
  116. Code int `json:"code"`
  117. Data struct {
  118. DeviceId string `json:"device_id"`
  119. Temperature float64 `json:"temperature"`
  120. Humidity float64 `json:"humidity"`
  121. } `json:"data"`
  122. }{
  123. Code: 200,
  124. Data: struct {
  125. DeviceId string `json:"device_id"`
  126. Temperature float64 `json:"temperature"`
  127. Humidity float64 `json:"humidity"`
  128. }{
  129. DeviceId: "device" + strconv.Itoa(i/3),
  130. Temperature: 25.5,
  131. Humidity: 60.0,
  132. },
  133. }
  134. i++
  135. jsonOut(w, nil, out)
  136. }).Methods(http.MethodGet)
  137. server := httptest.NewUnstartedServer(router)
  138. server.Listener.Close()
  139. server.Listener = l
  140. return server
  141. }
  142. var wrongPath, _ = filepath.Abs("/tmp/wrong")
  143. // Test configure to properties
  144. func TestConfigure(t *testing.T) {
  145. tests := []struct {
  146. name string
  147. props map[string]interface{}
  148. err error
  149. config *RawConf
  150. accessConf *AccessTokenConf
  151. refreshConf *RefreshTokenConf
  152. tokens map[string]interface{}
  153. }{
  154. {
  155. name: "default",
  156. props: map[string]interface{}{
  157. "incremental": true,
  158. "url": "http://localhost:9090/",
  159. },
  160. config: &RawConf{
  161. Incremental: true,
  162. Url: "http://localhost:9090/",
  163. Method: http.MethodGet,
  164. Interval: DefaultInterval,
  165. Timeout: DefaultTimeout,
  166. BodyType: "none",
  167. ResponseType: "code",
  168. InsecureSkipVerify: true,
  169. },
  170. },
  171. // Test wrong properties
  172. {
  173. name: "wrong props",
  174. props: map[string]interface{}{
  175. "incremental": true,
  176. "url": 123,
  177. },
  178. err: fmt.Errorf("fail to parse the properties: 1 error(s) decoding:\n\n* 'url' expected type 'string', got unconvertible type 'int', value: '123'"),
  179. },
  180. {
  181. name: "empty url",
  182. props: map[string]interface{}{
  183. "incremental": true,
  184. "url": "",
  185. },
  186. err: fmt.Errorf("url is required"),
  187. },
  188. {
  189. name: "wrong method",
  190. props: map[string]interface{}{
  191. "url": "http://localhost:9090/",
  192. "method": "wrong",
  193. },
  194. err: fmt.Errorf("Not supported HTTP method wrong."),
  195. },
  196. {
  197. name: "wrong bodytype",
  198. props: map[string]interface{}{
  199. "url": "http://localhost:9090/",
  200. "bodyType": "wrong",
  201. },
  202. err: fmt.Errorf("Not valid body type value wrong."),
  203. },
  204. {
  205. name: "wrong response type",
  206. props: map[string]interface{}{
  207. "url": "http://localhost:9090/",
  208. "responseType": "wrong",
  209. },
  210. err: fmt.Errorf("Not valid response type value wrong."),
  211. },
  212. {
  213. name: "wrong url",
  214. props: map[string]interface{}{
  215. "url": "http:/localhost:9090/",
  216. },
  217. err: fmt.Errorf("Invalid url, host not found"),
  218. },
  219. {
  220. name: "wrong interval",
  221. props: map[string]interface{}{
  222. "url": "http:/localhost:9090/",
  223. "interval": -2,
  224. },
  225. err: fmt.Errorf("interval must be greater than 0"),
  226. },
  227. {
  228. name: "wrong timeout",
  229. props: map[string]interface{}{
  230. "url": "http:/localhost:9090/",
  231. "timeout": -2,
  232. },
  233. err: fmt.Errorf("timeout must be greater than or equal to 0"),
  234. },
  235. {
  236. name: "wrong tls",
  237. props: map[string]interface{}{
  238. "url": "http://localhost:9090/",
  239. "certificationPath": wrongPath,
  240. },
  241. err: fmt.Errorf(fmt.Sprintf("stat %s: no such file or directory", wrongPath)),
  242. },
  243. // Test oAuth
  244. {
  245. name: "oAuth with access token and constant expire",
  246. props: map[string]interface{}{
  247. "url": "http://localhost:52345/",
  248. "headers": map[string]interface{}{
  249. "Authorization": "Bearer {{.token}}",
  250. },
  251. "oAuth": map[string]interface{}{
  252. "access": map[string]interface{}{
  253. "url": "http://localhost:52345/token",
  254. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  255. "expire": "3600",
  256. },
  257. },
  258. },
  259. config: &RawConf{
  260. Url: "http://localhost:52345/",
  261. Method: http.MethodGet,
  262. Interval: DefaultInterval,
  263. Timeout: DefaultTimeout,
  264. BodyType: "none",
  265. ResponseType: "code",
  266. InsecureSkipVerify: true,
  267. Headers: map[string]interface{}{
  268. "Authorization": "Bearer {{.token}}",
  269. },
  270. HeadersMap: map[string]string{
  271. "Authorization": "Bearer {{.token}}",
  272. },
  273. OAuth: map[string]map[string]interface{}{
  274. "access": {
  275. "url": "http://localhost:52345/token",
  276. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  277. "expire": "3600",
  278. },
  279. },
  280. },
  281. accessConf: &AccessTokenConf{
  282. Url: "http://localhost:52345/token",
  283. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  284. Expire: "3600",
  285. ExpireInSecond: 3600,
  286. },
  287. tokens: map[string]interface{}{
  288. "token": DefaultToken,
  289. "refresh_token": RefreshToken,
  290. "client_id": "test",
  291. "expires": float64(36000),
  292. },
  293. },
  294. {
  295. name: "oAuth with access token and dynamic expire",
  296. props: map[string]interface{}{
  297. "url": "http://localhost:52345/",
  298. "headers": map[string]interface{}{
  299. "Authorization": "Bearer {{.token}}",
  300. },
  301. "oAuth": map[string]interface{}{
  302. "access": map[string]interface{}{
  303. "url": "http://localhost:52345/token",
  304. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  305. "expire": "{{.expires}}",
  306. },
  307. },
  308. },
  309. config: &RawConf{
  310. Url: "http://localhost:52345/",
  311. Method: http.MethodGet,
  312. Interval: DefaultInterval,
  313. Timeout: DefaultTimeout,
  314. BodyType: "none",
  315. ResponseType: "code",
  316. InsecureSkipVerify: true,
  317. Headers: map[string]interface{}{
  318. "Authorization": "Bearer {{.token}}",
  319. },
  320. HeadersMap: map[string]string{
  321. "Authorization": "Bearer {{.token}}",
  322. },
  323. OAuth: map[string]map[string]interface{}{
  324. "access": {
  325. "url": "http://localhost:52345/token",
  326. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  327. "expire": "{{.expires}}",
  328. },
  329. },
  330. },
  331. accessConf: &AccessTokenConf{
  332. Url: "http://localhost:52345/token",
  333. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  334. Expire: "{{.expires}}",
  335. ExpireInSecond: 36000,
  336. },
  337. tokens: map[string]interface{}{
  338. "token": DefaultToken,
  339. "refresh_token": RefreshToken,
  340. "client_id": "test",
  341. "expires": float64(36000),
  342. },
  343. },
  344. {
  345. name: "oAuth with access token and refresh token",
  346. props: map[string]interface{}{
  347. "url": "http://localhost:52345/",
  348. "headers": map[string]interface{}{
  349. "Authorization": "Bearer {{.token}}",
  350. },
  351. "oAuth": map[string]interface{}{
  352. "access": map[string]interface{}{
  353. "url": "http://localhost:52345/token",
  354. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  355. "expire": "3600",
  356. },
  357. "refresh": map[string]interface{}{
  358. "url": "http://localhost:52345/refresh",
  359. "headers": map[string]interface{}{
  360. "Authorization": "Bearer {{.token}}",
  361. "RefreshToken": "{{.refresh_token}}",
  362. },
  363. },
  364. },
  365. },
  366. config: &RawConf{
  367. Url: "http://localhost:52345/",
  368. Method: http.MethodGet,
  369. Interval: DefaultInterval,
  370. Timeout: DefaultTimeout,
  371. BodyType: "none",
  372. ResponseType: "code",
  373. InsecureSkipVerify: true,
  374. Headers: map[string]interface{}{
  375. "Authorization": "Bearer {{.token}}",
  376. },
  377. HeadersMap: map[string]string{
  378. "Authorization": "Bearer {{.token}}",
  379. },
  380. OAuth: map[string]map[string]interface{}{
  381. "access": {
  382. "url": "http://localhost:52345/token",
  383. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  384. "expire": "3600",
  385. },
  386. "refresh": {
  387. "url": "http://localhost:52345/refresh",
  388. "headers": map[string]interface{}{
  389. "Authorization": "Bearer {{.token}}",
  390. "RefreshToken": "{{.refresh_token}}",
  391. },
  392. },
  393. },
  394. },
  395. accessConf: &AccessTokenConf{
  396. Url: "http://localhost:52345/token",
  397. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  398. Expire: "3600",
  399. ExpireInSecond: 3600,
  400. },
  401. refreshConf: &RefreshTokenConf{
  402. Url: "http://localhost:52345/refresh",
  403. Headers: map[string]string{
  404. "Authorization": "Bearer {{.token}}",
  405. "RefreshToken": "{{.refresh_token}}",
  406. },
  407. },
  408. tokens: map[string]interface{}{
  409. "token": DefaultToken,
  410. "refresh_token": RefreshToken,
  411. "client_id": "test",
  412. "expires": float64(36000),
  413. },
  414. },
  415. // Wrong auth configs
  416. {
  417. name: "oAuth wrong access token config",
  418. props: map[string]interface{}{
  419. "url": "http://localhost:52345/",
  420. "headers": map[string]interface{}{
  421. "Authorization": "Bearer {{.token}}",
  422. },
  423. "oAuth": map[string]interface{}{
  424. "access": map[string]interface{}{
  425. "url": "http://localhost:52345/token",
  426. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  427. "expire": 3600,
  428. },
  429. },
  430. },
  431. err: errors.New("fail to parse the access properties of oAuth: 1 error(s) decoding:\n\n* 'expire' expected type 'string', got unconvertible type 'int', value: '3600'"),
  432. },
  433. {
  434. name: "oAuth wrong access url",
  435. props: map[string]interface{}{
  436. "url": "http://localhost:52345/",
  437. "headers": map[string]interface{}{
  438. "Authorization": "Bearer {{.token}}",
  439. },
  440. "oAuth": map[string]interface{}{
  441. "access": map[string]interface{}{
  442. "url": "",
  443. },
  444. },
  445. },
  446. config: &RawConf{
  447. Url: "http://localhost:52345/",
  448. Method: http.MethodGet,
  449. Interval: DefaultInterval,
  450. Timeout: DefaultTimeout,
  451. BodyType: "none",
  452. ResponseType: "code",
  453. InsecureSkipVerify: true,
  454. Headers: map[string]interface{}{
  455. "Authorization": "Bearer {{.token}}",
  456. },
  457. HeadersMap: map[string]string{
  458. "Authorization": "Bearer {{.token}}",
  459. },
  460. },
  461. },
  462. {
  463. name: "oAuth miss access",
  464. props: map[string]interface{}{
  465. "url": "http://localhost:52345/",
  466. "headers": map[string]interface{}{
  467. "Authorization": "Bearer {{.token}}",
  468. },
  469. "oAuth": map[string]interface{}{
  470. "refresh": map[string]interface{}{
  471. "url": "http://localhost:52345/",
  472. },
  473. },
  474. },
  475. err: errors.New("if setting oAuth, `access` property is required"),
  476. },
  477. {
  478. name: "oAuth wrong refresh token config",
  479. props: map[string]interface{}{
  480. "url": "http://localhost:52345/",
  481. "headers": map[string]interface{}{
  482. "Authorization": "Bearer {{.token}}",
  483. },
  484. "oAuth": map[string]interface{}{
  485. "access": map[string]interface{}{
  486. "url": "http://localhost:52345/token",
  487. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  488. "expire": "3600",
  489. },
  490. "refresh": map[string]interface{}{
  491. "url": 1202,
  492. },
  493. },
  494. },
  495. accessConf: &AccessTokenConf{
  496. Url: "http://localhost:52345/token",
  497. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  498. Expire: "3600",
  499. ExpireInSecond: 3600,
  500. },
  501. err: errors.New("fail to parse the refresh token properties: 1 error(s) decoding:\n\n* 'url' expected type 'string', got unconvertible type 'int', value: '1202'"),
  502. },
  503. {
  504. name: "oAuth refresh token missing url",
  505. props: map[string]interface{}{
  506. "url": "http://localhost:52345/",
  507. "headers": map[string]interface{}{
  508. "Authorization": "Bearer {{.token}}",
  509. },
  510. "oAuth": map[string]interface{}{
  511. "access": map[string]interface{}{
  512. "url": "http://localhost:52345/token",
  513. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  514. "expire": "3600",
  515. },
  516. },
  517. },
  518. config: &RawConf{
  519. Url: "http://localhost:52345/",
  520. Method: http.MethodGet,
  521. Interval: DefaultInterval,
  522. Timeout: DefaultTimeout,
  523. BodyType: "none",
  524. ResponseType: "code",
  525. InsecureSkipVerify: true,
  526. Headers: map[string]interface{}{
  527. "Authorization": "Bearer {{.token}}",
  528. },
  529. HeadersMap: map[string]string{
  530. "Authorization": "Bearer {{.token}}",
  531. },
  532. OAuth: map[string]map[string]interface{}{
  533. "access": {
  534. "url": "http://localhost:52345/token",
  535. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  536. "expire": "3600",
  537. },
  538. },
  539. },
  540. accessConf: &AccessTokenConf{
  541. Url: "http://localhost:52345/token",
  542. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  543. Expire: "3600",
  544. ExpireInSecond: 3600,
  545. },
  546. tokens: map[string]interface{}{
  547. "token": DefaultToken,
  548. "refresh_token": RefreshToken,
  549. "client_id": "test",
  550. "expires": float64(36000),
  551. },
  552. },
  553. // oAuth authentication flow errors
  554. {
  555. name: "oAuth auth error",
  556. props: map[string]interface{}{
  557. "url": "http://localhost:52345/",
  558. "headers": map[string]interface{}{
  559. "Authorization": "Bearer {{.token}}",
  560. },
  561. "oAuth": map[string]interface{}{
  562. "access": map[string]interface{}{
  563. "url": "http://localhost:52345/token",
  564. "body": "{\"username\": \"admin\",\"password\": \"1234\"}",
  565. "expire": "3600",
  566. },
  567. },
  568. },
  569. config: &RawConf{
  570. Url: "http://localhost:52345/",
  571. Method: http.MethodGet,
  572. Interval: DefaultInterval,
  573. Timeout: DefaultTimeout,
  574. BodyType: "json",
  575. ResponseType: "code",
  576. InsecureSkipVerify: true,
  577. Headers: map[string]string{
  578. "Authorization": "Bearer {{.token}}",
  579. },
  580. OAuth: map[string]map[string]interface{}{
  581. "access": {
  582. "url": "http://localhost:52345/token",
  583. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  584. "expire": "3600",
  585. },
  586. },
  587. },
  588. accessConf: &AccessTokenConf{
  589. Url: "http://localhost:52345/token",
  590. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  591. Expire: "3600",
  592. ExpireInSecond: 3600,
  593. },
  594. tokens: map[string]interface{}{
  595. "token": DefaultToken,
  596. "refresh_token": RefreshToken,
  597. "client_id": "test",
  598. "expires": float64(36000),
  599. },
  600. err: errors.New("fail to authorize by oAuth: Cannot parse access token response to json: http return code error: 400"),
  601. },
  602. {
  603. name: "oAuth refresh error",
  604. props: map[string]interface{}{
  605. "url": "http://localhost:52345/",
  606. "headers": map[string]interface{}{
  607. "Authorization": "Bearer {{.token}}",
  608. },
  609. "oAuth": map[string]interface{}{
  610. "access": map[string]interface{}{
  611. "url": "http://localhost:52345/token",
  612. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  613. "expire": "3600",
  614. },
  615. "refresh": map[string]interface{}{
  616. "url": "http://localhost:52345/refresh",
  617. "headers": map[string]interface{}{
  618. "Authorization": "Bearer {{.token}}",
  619. "RefreshToken": "{{.token}}",
  620. },
  621. },
  622. },
  623. },
  624. err: errors.New("fail to authorize by oAuth: Cannot parse refresh token response to json: http return code error: 400"),
  625. },
  626. {
  627. name: "oAuth wrong access expire template",
  628. props: map[string]interface{}{
  629. "url": "http://localhost:52345/",
  630. "headers": map[string]interface{}{
  631. "Authorization": "Bearer {{.token}}",
  632. },
  633. "oAuth": map[string]interface{}{
  634. "access": map[string]interface{}{
  635. "url": "http://localhost:52345/token",
  636. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  637. "expire": "{{..expp}}",
  638. },
  639. },
  640. },
  641. err: errors.New("fail to authorize by oAuth: fail to parse the expire time for access token: template: sink:1: unexpected . after term \".\""),
  642. },
  643. {
  644. name: "oAuth wrong access expire type",
  645. props: map[string]interface{}{
  646. "url": "http://localhost:52345/",
  647. "headers": map[string]interface{}{
  648. "Authorization": "Bearer {{.token}}",
  649. },
  650. "oAuth": map[string]interface{}{
  651. "access": map[string]interface{}{
  652. "url": "http://localhost:52345/token",
  653. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  654. "expire": "{{.token}}",
  655. },
  656. },
  657. },
  658. err: errors.New("fail to authorize by oAuth: fail to covert the expire time privatisation for access token: cannot convert string(privatisation) to int"),
  659. },
  660. {
  661. name: "oAuth wrong access url",
  662. props: map[string]interface{}{
  663. "url": "http://localhost:52345/",
  664. "headers": map[string]interface{}{
  665. "Authorization": "Bearer {{.token}}",
  666. },
  667. "oAuth": map[string]interface{}{
  668. "access": map[string]interface{}{
  669. "url": "http:localhost:52345/token",
  670. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  671. "expire": "{{.token}}",
  672. },
  673. },
  674. },
  675. err: errors.New("fail to authorize by oAuth: fail to get access token: Post \"http:localhost:52345/token\": http: no Host in request URL"),
  676. },
  677. {
  678. name: "oAuth wrong refresh header template",
  679. props: map[string]interface{}{
  680. "url": "http://localhost:52345/",
  681. "headers": map[string]interface{}{
  682. "Authorization": "Bearer {{.token}}",
  683. },
  684. "oAuth": map[string]interface{}{
  685. "access": map[string]interface{}{
  686. "url": "http://localhost:52345/token",
  687. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  688. "expire": "3600",
  689. },
  690. "refresh": map[string]interface{}{
  691. "url": "http://localhost:52345/refresh",
  692. "headers": map[string]interface{}{
  693. "Authorization": "Bearer {{.token}}",
  694. "RefreshToken": "{{..token}}",
  695. },
  696. },
  697. },
  698. },
  699. err: errors.New("fail to authorize by oAuth: fail to parse the header for refresh token request RefreshToken: template: sink:1: unexpected . after term \".\""),
  700. },
  701. {
  702. name: "oAuth wrong refresh url",
  703. props: map[string]interface{}{
  704. "url": "http://localhost:52345/",
  705. "headers": map[string]interface{}{
  706. "Authorization": "Bearer {{.token}}",
  707. },
  708. "oAuth": map[string]interface{}{
  709. "access": map[string]interface{}{
  710. "url": "http://localhost:52345/token",
  711. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  712. "expire": "3600",
  713. },
  714. "refresh": map[string]interface{}{
  715. "url": "http:localhost:52345/refresh2",
  716. "headers": map[string]interface{}{
  717. "Authorization": "Bearer {{.token}}",
  718. "RefreshToken": "{{.token}}",
  719. },
  720. },
  721. },
  722. },
  723. err: errors.New("fail to authorize by oAuth: fail to get refresh token: Post \"http:localhost:52345/refresh2\": http: no Host in request URL"),
  724. },
  725. }
  726. server := mockAuthServer()
  727. server.Start()
  728. defer server.Close()
  729. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  730. for i, tt := range tests {
  731. t.Run(fmt.Sprintf("Test %d: %s", i, tt.name), func(t *testing.T) {
  732. r := &PullSource{}
  733. err := r.Configure("", tt.props)
  734. if err != nil {
  735. if tt.err == nil {
  736. t.Errorf("Expected error: %v", err)
  737. } else {
  738. if err.Error() != tt.err.Error() {
  739. t.Errorf("Error mismatch\nexp\t%v\ngot\t%v", tt.err, err)
  740. }
  741. }
  742. return
  743. }
  744. if !reflect.DeepEqual(r.config, tt.config) {
  745. t.Errorf("Config mismatch\nexp\t%+v\ngot\t%+v", tt.config, r.config)
  746. }
  747. if !reflect.DeepEqual(r.accessConf, tt.accessConf) {
  748. t.Errorf("AccessConf mismatch\nexp\t%+v\ngot\t%+v", tt.accessConf, r.accessConf)
  749. }
  750. if !reflect.DeepEqual(r.refreshConf, tt.refreshConf) {
  751. t.Errorf("RefreshConf mismatch\nexp\t%+v\ngot\t%+v", tt.refreshConf, r.refreshConf)
  752. }
  753. if !reflect.DeepEqual(r.tokens, tt.tokens) {
  754. t.Errorf("Tokens mismatch\nexp\t%s\ngot\t%s", tt.tokens, r.tokens)
  755. }
  756. })
  757. }
  758. }
  759. func TestPullWithAuth(t *testing.T) {
  760. r := &PullSource{}
  761. server := mockAuthServer()
  762. server.Start()
  763. defer server.Close()
  764. err := r.Configure("data", map[string]interface{}{
  765. "url": "http://localhost:52345/",
  766. "interval": 100,
  767. "headers": map[string]interface{}{
  768. "Authorization": "Bearer {{.token}}",
  769. },
  770. "oAuth": map[string]interface{}{
  771. "access": map[string]interface{}{
  772. "url": "http://localhost:52345/token",
  773. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  774. "expire": "10",
  775. },
  776. "refresh": map[string]interface{}{
  777. "url": "http://localhost:52345/refresh",
  778. "headers": map[string]interface{}{
  779. "Authorization": "Bearer {{.token}}",
  780. "RefreshToken": "{{.refresh_token}}",
  781. },
  782. },
  783. },
  784. })
  785. if err != nil {
  786. t.Errorf(err.Error())
  787. return
  788. }
  789. mc := conf.Clock.(*clock.Mock)
  790. exp := []api.SourceTuple{
  791. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}, map[string]interface{}{}, mc.Now()),
  792. }
  793. mock.TestSourceOpen(r, exp, t)
  794. }
  795. func TestPullIncremental(t *testing.T) {
  796. r := &PullSource{}
  797. server := mockAuthServer()
  798. server.Start()
  799. defer server.Close()
  800. err := r.Configure("data2", map[string]interface{}{
  801. "url": "http://localhost:52345/",
  802. "interval": 100,
  803. "incremental": true,
  804. "responseType": "body",
  805. })
  806. if err != nil {
  807. t.Errorf(err.Error())
  808. return
  809. }
  810. mc := conf.Clock.(*clock.Mock)
  811. exp := []api.SourceTuple{
  812. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device0", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
  813. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
  814. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device2", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
  815. }
  816. mock.TestSourceOpen(r, exp, t)
  817. }