httppull_source_test.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package http
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "net"
  20. "net/http"
  21. "net/http/httptest"
  22. "path/filepath"
  23. "reflect"
  24. "strconv"
  25. "testing"
  26. "github.com/benbjohnson/clock"
  27. "github.com/gorilla/mux"
  28. "github.com/lf-edge/ekuiper/internal/conf"
  29. "github.com/lf-edge/ekuiper/internal/io/mock"
  30. "github.com/lf-edge/ekuiper/pkg/api"
  31. )
  32. func jsonOut(w http.ResponseWriter, out interface{}) {
  33. w.Header().Add("Content-Type", "application/json")
  34. enc := json.NewEncoder(w)
  35. err := enc.Encode(out)
  36. // Problems encoding
  37. if err != nil {
  38. http.Error(w, err.Error(), http.StatusBadRequest)
  39. }
  40. }
  41. const (
  42. DefaultToken = "privatisation"
  43. RefreshToken = "privaterefresh"
  44. )
  45. // mock http auth server
  46. func mockAuthServer() *httptest.Server {
  47. l, _ := net.Listen("tcp", "127.0.0.1:52345")
  48. router := mux.NewRouter()
  49. i := 0
  50. router.HandleFunc("/token", func(w http.ResponseWriter, r *http.Request) {
  51. body := &struct {
  52. Username string `json:"username"`
  53. Password string `json:"password"`
  54. }{}
  55. err := json.NewDecoder(r.Body).Decode(body)
  56. if err != nil {
  57. http.Error(w, err.Error(), http.StatusBadRequest)
  58. }
  59. if body.Username != "admin" || body.Password != "0000" {
  60. http.Error(w, "invalid username or password", http.StatusBadRequest)
  61. }
  62. out := &struct {
  63. Token string `json:"token"`
  64. RefreshToken string `json:"refresh_token"`
  65. ClientId string `json:"client_id"`
  66. Expires int64 `json:"expires"`
  67. }{
  68. Token: DefaultToken,
  69. RefreshToken: RefreshToken,
  70. ClientId: "test",
  71. Expires: 36000,
  72. }
  73. jsonOut(w, out)
  74. }).Methods(http.MethodPost)
  75. router.HandleFunc("/refresh", func(w http.ResponseWriter, r *http.Request) {
  76. token := r.Header.Get("Authorization")
  77. if token != "Bearer "+DefaultToken {
  78. http.Error(w, "invalid token", http.StatusBadRequest)
  79. }
  80. rt := r.Header.Get("RefreshToken")
  81. if rt != RefreshToken {
  82. http.Error(w, "invalid refresh token", http.StatusBadRequest)
  83. }
  84. out := &struct {
  85. Token string `json:"token"`
  86. RefreshToken string `json:"refresh_token"`
  87. ClientId string `json:"client_id"`
  88. Expires int64 `json:"expires"`
  89. }{
  90. Token: DefaultToken,
  91. RefreshToken: RefreshToken,
  92. ClientId: "test",
  93. Expires: 36000,
  94. }
  95. jsonOut(w, out)
  96. }).Methods(http.MethodPost)
  97. router.HandleFunc("/data", func(w http.ResponseWriter, r *http.Request) {
  98. token := r.Header.Get("Authorization")
  99. if token != "Bearer "+DefaultToken {
  100. http.Error(w, "invalid token", http.StatusBadRequest)
  101. }
  102. out := &struct {
  103. DeviceId string `json:"device_id"`
  104. Temperature float64 `json:"temperature"`
  105. Humidity float64 `json:"humidity"`
  106. }{
  107. DeviceId: "device1",
  108. Temperature: 25.5,
  109. Humidity: 60.0,
  110. }
  111. jsonOut(w, out)
  112. }).Methods(http.MethodGet)
  113. // Return same data for 3 times
  114. router.HandleFunc("/data2", func(w http.ResponseWriter, r *http.Request) {
  115. out := &struct {
  116. Code int `json:"code"`
  117. Data struct {
  118. DeviceId string `json:"device_id"`
  119. Temperature float64 `json:"temperature"`
  120. Humidity float64 `json:"humidity"`
  121. } `json:"data"`
  122. }{
  123. Code: 200,
  124. Data: struct {
  125. DeviceId string `json:"device_id"`
  126. Temperature float64 `json:"temperature"`
  127. Humidity float64 `json:"humidity"`
  128. }{
  129. DeviceId: "device" + strconv.Itoa(i/3),
  130. Temperature: 25.5,
  131. Humidity: 60.0,
  132. },
  133. }
  134. i++
  135. jsonOut(w, out)
  136. }).Methods(http.MethodGet)
  137. router.HandleFunc("/data3", func(w http.ResponseWriter, r *http.Request) {
  138. out := []*struct {
  139. Code int `json:"code"`
  140. Data struct {
  141. DeviceId string `json:"device_id"`
  142. Temperature float64 `json:"temperature"`
  143. Humidity float64 `json:"humidity"`
  144. } `json:"data"`
  145. }{
  146. {
  147. Code: 200,
  148. Data: struct {
  149. DeviceId string `json:"device_id"`
  150. Temperature float64 `json:"temperature"`
  151. Humidity float64 `json:"humidity"`
  152. }{
  153. DeviceId: "d1",
  154. Temperature: 25.5,
  155. Humidity: 60.0,
  156. },
  157. },
  158. {
  159. Code: 200,
  160. Data: struct {
  161. DeviceId string `json:"device_id"`
  162. Temperature float64 `json:"temperature"`
  163. Humidity float64 `json:"humidity"`
  164. }{
  165. DeviceId: "d2",
  166. Temperature: 25.5,
  167. Humidity: 60.0,
  168. },
  169. },
  170. }
  171. jsonOut(w, out)
  172. }).Methods(http.MethodGet)
  173. server := httptest.NewUnstartedServer(router)
  174. server.Listener.Close()
  175. server.Listener = l
  176. return server
  177. }
  178. var wrongPath, _ = filepath.Abs("/tmp/wrong")
  179. // Test configure to properties
  180. func TestConfigure(t *testing.T) {
  181. tests := []struct {
  182. name string
  183. props map[string]interface{}
  184. err error
  185. config *RawConf
  186. accessConf *AccessTokenConf
  187. refreshConf *RefreshTokenConf
  188. tokens map[string]interface{}
  189. }{
  190. {
  191. name: "default",
  192. props: map[string]interface{}{
  193. "incremental": true,
  194. "url": "http://localhost:9090/",
  195. },
  196. config: &RawConf{
  197. Incremental: true,
  198. Url: "http://localhost:9090/",
  199. Method: http.MethodGet,
  200. Interval: DefaultInterval,
  201. Timeout: DefaultTimeout,
  202. BodyType: "none",
  203. ResponseType: "code",
  204. InsecureSkipVerify: true,
  205. },
  206. },
  207. // Test wrong properties
  208. {
  209. name: "wrong props",
  210. props: map[string]interface{}{
  211. "incremental": true,
  212. "url": 123,
  213. },
  214. err: fmt.Errorf("fail to parse the properties: 1 error(s) decoding:\n\n* 'url' expected type 'string', got unconvertible type 'int', value: '123'"),
  215. },
  216. {
  217. name: "empty url",
  218. props: map[string]interface{}{
  219. "incremental": true,
  220. "url": "",
  221. },
  222. err: fmt.Errorf("url is required"),
  223. },
  224. {
  225. name: "wrong method",
  226. props: map[string]interface{}{
  227. "url": "http://localhost:9090/",
  228. "method": "wrong",
  229. },
  230. err: fmt.Errorf("Not supported HTTP method wrong."),
  231. },
  232. {
  233. name: "wrong bodytype",
  234. props: map[string]interface{}{
  235. "url": "http://localhost:9090/",
  236. "bodyType": "wrong",
  237. },
  238. err: fmt.Errorf("Not valid body type value wrong."),
  239. },
  240. {
  241. name: "wrong response type",
  242. props: map[string]interface{}{
  243. "url": "http://localhost:9090/",
  244. "responseType": "wrong",
  245. },
  246. err: fmt.Errorf("Not valid response type value wrong."),
  247. },
  248. {
  249. name: "wrong url",
  250. props: map[string]interface{}{
  251. "url": "http:/localhost:9090/",
  252. },
  253. err: fmt.Errorf("Invalid url, host not found"),
  254. },
  255. {
  256. name: "wrong interval",
  257. props: map[string]interface{}{
  258. "url": "http:/localhost:9090/",
  259. "interval": -2,
  260. },
  261. err: fmt.Errorf("interval must be greater than 0"),
  262. },
  263. {
  264. name: "wrong timeout",
  265. props: map[string]interface{}{
  266. "url": "http:/localhost:9090/",
  267. "timeout": -2,
  268. },
  269. err: fmt.Errorf("timeout must be greater than or equal to 0"),
  270. },
  271. {
  272. name: "wrong tls",
  273. props: map[string]interface{}{
  274. "url": "http://localhost:9090/",
  275. "certificationPath": wrongPath,
  276. },
  277. err: fmt.Errorf(fmt.Sprintf("stat %s: no such file or directory", wrongPath)),
  278. },
  279. // Test oAuth
  280. {
  281. name: "oAuth with access token and constant expire",
  282. props: map[string]interface{}{
  283. "url": "http://localhost:52345/",
  284. "headers": map[string]interface{}{
  285. "Authorization": "Bearer {{.token}}",
  286. },
  287. "oAuth": map[string]interface{}{
  288. "access": map[string]interface{}{
  289. "url": "http://localhost:52345/token",
  290. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  291. "expire": "3600",
  292. },
  293. },
  294. },
  295. config: &RawConf{
  296. Url: "http://localhost:52345/",
  297. Method: http.MethodGet,
  298. Interval: DefaultInterval,
  299. Timeout: DefaultTimeout,
  300. BodyType: "none",
  301. ResponseType: "code",
  302. InsecureSkipVerify: true,
  303. Headers: map[string]interface{}{
  304. "Authorization": "Bearer {{.token}}",
  305. },
  306. HeadersMap: map[string]string{
  307. "Authorization": "Bearer {{.token}}",
  308. },
  309. OAuth: map[string]map[string]interface{}{
  310. "access": {
  311. "url": "http://localhost:52345/token",
  312. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  313. "expire": "3600",
  314. },
  315. },
  316. },
  317. accessConf: &AccessTokenConf{
  318. Url: "http://localhost:52345/token",
  319. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  320. Expire: "3600",
  321. ExpireInSecond: 3600,
  322. },
  323. tokens: map[string]interface{}{
  324. "token": DefaultToken,
  325. "refresh_token": RefreshToken,
  326. "client_id": "test",
  327. "expires": float64(36000),
  328. },
  329. },
  330. {
  331. name: "oAuth with access token and dynamic expire",
  332. props: map[string]interface{}{
  333. "url": "http://localhost:52345/",
  334. "headers": map[string]interface{}{
  335. "Authorization": "Bearer {{.token}}",
  336. },
  337. "oAuth": map[string]interface{}{
  338. "access": map[string]interface{}{
  339. "url": "http://localhost:52345/token",
  340. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  341. "expire": "{{.expires}}",
  342. },
  343. },
  344. },
  345. config: &RawConf{
  346. Url: "http://localhost:52345/",
  347. Method: http.MethodGet,
  348. Interval: DefaultInterval,
  349. Timeout: DefaultTimeout,
  350. BodyType: "none",
  351. ResponseType: "code",
  352. InsecureSkipVerify: true,
  353. Headers: map[string]interface{}{
  354. "Authorization": "Bearer {{.token}}",
  355. },
  356. HeadersMap: map[string]string{
  357. "Authorization": "Bearer {{.token}}",
  358. },
  359. OAuth: map[string]map[string]interface{}{
  360. "access": {
  361. "url": "http://localhost:52345/token",
  362. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  363. "expire": "{{.expires}}",
  364. },
  365. },
  366. },
  367. accessConf: &AccessTokenConf{
  368. Url: "http://localhost:52345/token",
  369. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  370. Expire: "{{.expires}}",
  371. ExpireInSecond: 36000,
  372. },
  373. tokens: map[string]interface{}{
  374. "token": DefaultToken,
  375. "refresh_token": RefreshToken,
  376. "client_id": "test",
  377. "expires": float64(36000),
  378. },
  379. },
  380. {
  381. name: "oAuth with access token and refresh token",
  382. props: map[string]interface{}{
  383. "url": "http://localhost:52345/",
  384. "headers": map[string]interface{}{
  385. "Authorization": "Bearer {{.token}}",
  386. },
  387. "oAuth": map[string]interface{}{
  388. "access": map[string]interface{}{
  389. "url": "http://localhost:52345/token",
  390. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  391. "expire": "3600",
  392. },
  393. "refresh": map[string]interface{}{
  394. "url": "http://localhost:52345/refresh",
  395. "headers": map[string]interface{}{
  396. "Authorization": "Bearer {{.token}}",
  397. "RefreshToken": "{{.refresh_token}}",
  398. },
  399. },
  400. },
  401. },
  402. config: &RawConf{
  403. Url: "http://localhost:52345/",
  404. Method: http.MethodGet,
  405. Interval: DefaultInterval,
  406. Timeout: DefaultTimeout,
  407. BodyType: "none",
  408. ResponseType: "code",
  409. InsecureSkipVerify: true,
  410. Headers: map[string]interface{}{
  411. "Authorization": "Bearer {{.token}}",
  412. },
  413. HeadersMap: map[string]string{
  414. "Authorization": "Bearer {{.token}}",
  415. },
  416. OAuth: map[string]map[string]interface{}{
  417. "access": {
  418. "url": "http://localhost:52345/token",
  419. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  420. "expire": "3600",
  421. },
  422. "refresh": {
  423. "url": "http://localhost:52345/refresh",
  424. "headers": map[string]interface{}{
  425. "Authorization": "Bearer {{.token}}",
  426. "RefreshToken": "{{.refresh_token}}",
  427. },
  428. },
  429. },
  430. },
  431. accessConf: &AccessTokenConf{
  432. Url: "http://localhost:52345/token",
  433. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  434. Expire: "3600",
  435. ExpireInSecond: 3600,
  436. },
  437. refreshConf: &RefreshTokenConf{
  438. Url: "http://localhost:52345/refresh",
  439. Headers: map[string]string{
  440. "Authorization": "Bearer {{.token}}",
  441. "RefreshToken": "{{.refresh_token}}",
  442. },
  443. },
  444. tokens: map[string]interface{}{
  445. "token": DefaultToken,
  446. "refresh_token": RefreshToken,
  447. "client_id": "test",
  448. "expires": float64(36000),
  449. },
  450. },
  451. // Wrong auth configs
  452. {
  453. name: "oAuth wrong access token config",
  454. props: map[string]interface{}{
  455. "url": "http://localhost:52345/",
  456. "headers": map[string]interface{}{
  457. "Authorization": "Bearer {{.token}}",
  458. },
  459. "oAuth": map[string]interface{}{
  460. "access": map[string]interface{}{
  461. "url": "http://localhost:52345/token",
  462. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  463. "expire": 3600,
  464. },
  465. },
  466. },
  467. err: errors.New("fail to parse the access properties of oAuth: 1 error(s) decoding:\n\n* 'expire' expected type 'string', got unconvertible type 'int', value: '3600'"),
  468. },
  469. {
  470. name: "oAuth wrong access url",
  471. props: map[string]interface{}{
  472. "url": "http://localhost:52345/",
  473. "headers": map[string]interface{}{
  474. "Authorization": "Bearer {{.token}}",
  475. },
  476. "oAuth": map[string]interface{}{
  477. "access": map[string]interface{}{
  478. "url": "",
  479. },
  480. },
  481. },
  482. config: &RawConf{
  483. Url: "http://localhost:52345/",
  484. Method: http.MethodGet,
  485. Interval: DefaultInterval,
  486. Timeout: DefaultTimeout,
  487. BodyType: "none",
  488. ResponseType: "code",
  489. InsecureSkipVerify: true,
  490. Headers: map[string]interface{}{
  491. "Authorization": "Bearer {{.token}}",
  492. },
  493. HeadersMap: map[string]string{
  494. "Authorization": "Bearer {{.token}}",
  495. },
  496. },
  497. },
  498. {
  499. name: "oAuth miss access",
  500. props: map[string]interface{}{
  501. "url": "http://localhost:52345/",
  502. "headers": map[string]interface{}{
  503. "Authorization": "Bearer {{.token}}",
  504. },
  505. "oAuth": map[string]interface{}{
  506. "refresh": map[string]interface{}{
  507. "url": "http://localhost:52345/",
  508. },
  509. },
  510. },
  511. err: errors.New("if setting oAuth, `access` property is required"),
  512. },
  513. {
  514. name: "oAuth wrong refresh token config",
  515. props: map[string]interface{}{
  516. "url": "http://localhost:52345/",
  517. "headers": map[string]interface{}{
  518. "Authorization": "Bearer {{.token}}",
  519. },
  520. "oAuth": map[string]interface{}{
  521. "access": map[string]interface{}{
  522. "url": "http://localhost:52345/token",
  523. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  524. "expire": "3600",
  525. },
  526. "refresh": map[string]interface{}{
  527. "url": 1202,
  528. },
  529. },
  530. },
  531. accessConf: &AccessTokenConf{
  532. Url: "http://localhost:52345/token",
  533. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  534. Expire: "3600",
  535. ExpireInSecond: 3600,
  536. },
  537. err: errors.New("fail to parse the refresh token properties: 1 error(s) decoding:\n\n* 'url' expected type 'string', got unconvertible type 'int', value: '1202'"),
  538. },
  539. {
  540. name: "oAuth refresh token missing url",
  541. props: map[string]interface{}{
  542. "url": "http://localhost:52345/",
  543. "headers": map[string]interface{}{
  544. "Authorization": "Bearer {{.token}}",
  545. },
  546. "oAuth": map[string]interface{}{
  547. "access": map[string]interface{}{
  548. "url": "http://localhost:52345/token",
  549. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  550. "expire": "3600",
  551. },
  552. },
  553. },
  554. config: &RawConf{
  555. Url: "http://localhost:52345/",
  556. Method: http.MethodGet,
  557. Interval: DefaultInterval,
  558. Timeout: DefaultTimeout,
  559. BodyType: "none",
  560. ResponseType: "code",
  561. InsecureSkipVerify: true,
  562. Headers: map[string]interface{}{
  563. "Authorization": "Bearer {{.token}}",
  564. },
  565. HeadersMap: map[string]string{
  566. "Authorization": "Bearer {{.token}}",
  567. },
  568. OAuth: map[string]map[string]interface{}{
  569. "access": {
  570. "url": "http://localhost:52345/token",
  571. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  572. "expire": "3600",
  573. },
  574. },
  575. },
  576. accessConf: &AccessTokenConf{
  577. Url: "http://localhost:52345/token",
  578. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  579. Expire: "3600",
  580. ExpireInSecond: 3600,
  581. },
  582. tokens: map[string]interface{}{
  583. "token": DefaultToken,
  584. "refresh_token": RefreshToken,
  585. "client_id": "test",
  586. "expires": float64(36000),
  587. },
  588. },
  589. // oAuth authentication flow errors
  590. {
  591. name: "oAuth auth error",
  592. props: map[string]interface{}{
  593. "url": "http://localhost:52345/",
  594. "headers": map[string]interface{}{
  595. "Authorization": "Bearer {{.token}}",
  596. },
  597. "oAuth": map[string]interface{}{
  598. "access": map[string]interface{}{
  599. "url": "http://localhost:52345/token",
  600. "body": "{\"username\": \"admin\",\"password\": \"1234\"}",
  601. "expire": "3600",
  602. },
  603. },
  604. },
  605. config: &RawConf{
  606. Url: "http://localhost:52345/",
  607. Method: http.MethodGet,
  608. Interval: DefaultInterval,
  609. Timeout: DefaultTimeout,
  610. BodyType: "json",
  611. ResponseType: "code",
  612. InsecureSkipVerify: true,
  613. Headers: map[string]string{
  614. "Authorization": "Bearer {{.token}}",
  615. },
  616. OAuth: map[string]map[string]interface{}{
  617. "access": {
  618. "url": "http://localhost:52345/token",
  619. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  620. "expire": "3600",
  621. },
  622. },
  623. },
  624. accessConf: &AccessTokenConf{
  625. Url: "http://localhost:52345/token",
  626. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  627. Expire: "3600",
  628. ExpireInSecond: 3600,
  629. },
  630. tokens: map[string]interface{}{
  631. "token": DefaultToken,
  632. "refresh_token": RefreshToken,
  633. "client_id": "test",
  634. "expires": float64(36000),
  635. },
  636. err: errors.New("fail to authorize by oAuth: Cannot parse access token response to json: http return code error: 400"),
  637. },
  638. {
  639. name: "oAuth refresh error",
  640. props: map[string]interface{}{
  641. "url": "http://localhost:52345/",
  642. "headers": map[string]interface{}{
  643. "Authorization": "Bearer {{.token}}",
  644. },
  645. "oAuth": map[string]interface{}{
  646. "access": map[string]interface{}{
  647. "url": "http://localhost:52345/token",
  648. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  649. "expire": "3600",
  650. },
  651. "refresh": map[string]interface{}{
  652. "url": "http://localhost:52345/refresh",
  653. "headers": map[string]interface{}{
  654. "Authorization": "Bearer {{.token}}",
  655. "RefreshToken": "{{.token}}",
  656. },
  657. },
  658. },
  659. },
  660. err: errors.New("fail to authorize by oAuth: Cannot parse refresh token response to json: http return code error: 400"),
  661. },
  662. {
  663. name: "oAuth wrong access expire template",
  664. props: map[string]interface{}{
  665. "url": "http://localhost:52345/",
  666. "headers": map[string]interface{}{
  667. "Authorization": "Bearer {{.token}}",
  668. },
  669. "oAuth": map[string]interface{}{
  670. "access": map[string]interface{}{
  671. "url": "http://localhost:52345/token",
  672. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  673. "expire": "{{..expp}}",
  674. },
  675. },
  676. },
  677. err: errors.New("fail to authorize by oAuth: fail to parse the expire time for access token: template: sink:1: unexpected . after term \".\""),
  678. },
  679. {
  680. name: "oAuth wrong access expire type",
  681. props: map[string]interface{}{
  682. "url": "http://localhost:52345/",
  683. "headers": map[string]interface{}{
  684. "Authorization": "Bearer {{.token}}",
  685. },
  686. "oAuth": map[string]interface{}{
  687. "access": map[string]interface{}{
  688. "url": "http://localhost:52345/token",
  689. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  690. "expire": "{{.token}}",
  691. },
  692. },
  693. },
  694. err: errors.New("fail to authorize by oAuth: fail to covert the expire time privatisation for access token: cannot convert string(privatisation) to int"),
  695. },
  696. {
  697. name: "oAuth wrong access url",
  698. props: map[string]interface{}{
  699. "url": "http://localhost:52345/",
  700. "headers": map[string]interface{}{
  701. "Authorization": "Bearer {{.token}}",
  702. },
  703. "oAuth": map[string]interface{}{
  704. "access": map[string]interface{}{
  705. "url": "http:localhost:52345/token",
  706. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  707. "expire": "{{.token}}",
  708. },
  709. },
  710. },
  711. err: errors.New("fail to authorize by oAuth: fail to get access token: Post \"http:localhost:52345/token\": http: no Host in request URL"),
  712. },
  713. {
  714. name: "oAuth wrong refresh header template",
  715. props: map[string]interface{}{
  716. "url": "http://localhost:52345/",
  717. "headers": map[string]interface{}{
  718. "Authorization": "Bearer {{.token}}",
  719. },
  720. "oAuth": map[string]interface{}{
  721. "access": map[string]interface{}{
  722. "url": "http://localhost:52345/token",
  723. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  724. "expire": "3600",
  725. },
  726. "refresh": map[string]interface{}{
  727. "url": "http://localhost:52345/refresh",
  728. "headers": map[string]interface{}{
  729. "Authorization": "Bearer {{.token}}",
  730. "RefreshToken": "{{..token}}",
  731. },
  732. },
  733. },
  734. },
  735. err: errors.New("fail to authorize by oAuth: fail to parse the header for refresh token request RefreshToken: template: sink:1: unexpected . after term \".\""),
  736. },
  737. {
  738. name: "oAuth wrong refresh url",
  739. props: map[string]interface{}{
  740. "url": "http://localhost:52345/",
  741. "headers": map[string]interface{}{
  742. "Authorization": "Bearer {{.token}}",
  743. },
  744. "oAuth": map[string]interface{}{
  745. "access": map[string]interface{}{
  746. "url": "http://localhost:52345/token",
  747. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  748. "expire": "3600",
  749. },
  750. "refresh": map[string]interface{}{
  751. "url": "http:localhost:52345/refresh2",
  752. "headers": map[string]interface{}{
  753. "Authorization": "Bearer {{.token}}",
  754. "RefreshToken": "{{.token}}",
  755. },
  756. },
  757. },
  758. },
  759. err: errors.New("fail to authorize by oAuth: fail to get refresh token: Post \"http:localhost:52345/refresh2\": http: no Host in request URL"),
  760. },
  761. }
  762. server := mockAuthServer()
  763. server.Start()
  764. defer server.Close()
  765. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  766. for i, tt := range tests {
  767. t.Run(fmt.Sprintf("Test %d: %s", i, tt.name), func(t *testing.T) {
  768. r := &PullSource{}
  769. err := r.Configure("", tt.props)
  770. if err != nil {
  771. if tt.err == nil {
  772. t.Errorf("Expected error: %v", err)
  773. } else {
  774. if err.Error() != tt.err.Error() {
  775. t.Errorf("Error mismatch\nexp\t%v\ngot\t%v", tt.err, err)
  776. }
  777. }
  778. return
  779. }
  780. if !reflect.DeepEqual(r.config, tt.config) {
  781. t.Errorf("Config mismatch\nexp\t%+v\ngot\t%+v", tt.config, r.config)
  782. }
  783. if !reflect.DeepEqual(r.accessConf, tt.accessConf) {
  784. t.Errorf("AccessConf mismatch\nexp\t%+v\ngot\t%+v", tt.accessConf, r.accessConf)
  785. }
  786. if !reflect.DeepEqual(r.refreshConf, tt.refreshConf) {
  787. t.Errorf("RefreshConf mismatch\nexp\t%+v\ngot\t%+v", tt.refreshConf, r.refreshConf)
  788. }
  789. if !reflect.DeepEqual(r.tokens, tt.tokens) {
  790. t.Errorf("Tokens mismatch\nexp\t%s\ngot\t%s", tt.tokens, r.tokens)
  791. }
  792. })
  793. }
  794. }
  795. func TestPullWithAuth(t *testing.T) {
  796. r := &PullSource{}
  797. server := mockAuthServer()
  798. server.Start()
  799. defer server.Close()
  800. err := r.Configure("data", map[string]interface{}{
  801. "url": "http://localhost:52345/",
  802. "interval": 100,
  803. "headers": map[string]interface{}{
  804. "Authorization": "Bearer {{.token}}",
  805. },
  806. "oAuth": map[string]interface{}{
  807. "access": map[string]interface{}{
  808. "url": "http://localhost:52345/token",
  809. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  810. "expire": "10",
  811. },
  812. "refresh": map[string]interface{}{
  813. "url": "http://localhost:52345/refresh",
  814. "headers": map[string]interface{}{
  815. "Authorization": "Bearer {{.token}}",
  816. "RefreshToken": "{{.refresh_token}}",
  817. },
  818. },
  819. },
  820. })
  821. if err != nil {
  822. t.Errorf(err.Error())
  823. return
  824. }
  825. mc := conf.Clock.(*clock.Mock)
  826. exp := []api.SourceTuple{
  827. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}, map[string]interface{}{}, mc.Now()),
  828. }
  829. mock.TestSourceOpen(r, exp, t)
  830. }
  831. func TestPullIncremental(t *testing.T) {
  832. r := &PullSource{}
  833. server := mockAuthServer()
  834. server.Start()
  835. defer server.Close()
  836. err := r.Configure("data2", map[string]interface{}{
  837. "url": "http://localhost:52345/",
  838. "interval": 100,
  839. "incremental": true,
  840. "responseType": "body",
  841. })
  842. if err != nil {
  843. t.Errorf(err.Error())
  844. return
  845. }
  846. mc := conf.Clock.(*clock.Mock)
  847. exp := []api.SourceTuple{
  848. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device0", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
  849. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
  850. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device2", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
  851. }
  852. mock.TestSourceOpen(r, exp, t)
  853. }
  854. func TestPullJsonList(t *testing.T) {
  855. r := &PullSource{}
  856. server := mockAuthServer()
  857. server.Start()
  858. defer server.Close()
  859. err := r.Configure("data3", map[string]interface{}{
  860. "url": "http://localhost:52345/",
  861. "interval": 100,
  862. "responseType": "body",
  863. })
  864. if err != nil {
  865. t.Errorf(err.Error())
  866. return
  867. }
  868. mc := conf.Clock.(*clock.Mock)
  869. exp := []api.SourceTuple{
  870. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
  871. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d2", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
  872. }
  873. mock.TestSourceOpen(r, exp, t)
  874. }