httppull_source_test.go 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package http
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "io"
  20. "net"
  21. "net/http"
  22. "net/http/httptest"
  23. "path/filepath"
  24. "reflect"
  25. "strconv"
  26. "testing"
  27. "time"
  28. "github.com/gorilla/mux"
  29. "github.com/stretchr/testify/assert"
  30. "github.com/lf-edge/ekuiper/internal/conf"
  31. "github.com/lf-edge/ekuiper/internal/io/mock"
  32. "github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
  33. "github.com/lf-edge/ekuiper/internal/xsql"
  34. "github.com/lf-edge/ekuiper/pkg/api"
  35. )
  36. func jsonOut(w http.ResponseWriter, out interface{}) {
  37. w.Header().Add("Content-Type", "application/json")
  38. enc := json.NewEncoder(w)
  39. err := enc.Encode(out)
  40. // Problems encoding
  41. if err != nil {
  42. http.Error(w, err.Error(), http.StatusBadRequest)
  43. }
  44. }
  45. const (
  46. DefaultToken = "privatisation"
  47. RefreshToken = "privaterefresh"
  48. )
  49. // mock http auth server
  50. func mockAuthServer() *httptest.Server {
  51. l, _ := net.Listen("tcp", "127.0.0.1:52345")
  52. router := mux.NewRouter()
  53. i := 0
  54. router.HandleFunc("/token", func(w http.ResponseWriter, r *http.Request) {
  55. body := &struct {
  56. Username string `json:"username"`
  57. Password string `json:"password"`
  58. }{}
  59. err := json.NewDecoder(r.Body).Decode(body)
  60. if err != nil {
  61. http.Error(w, err.Error(), http.StatusBadRequest)
  62. }
  63. if body.Username != "admin" || body.Password != "0000" {
  64. http.Error(w, "invalid username or password", http.StatusBadRequest)
  65. }
  66. out := &struct {
  67. Token string `json:"token"`
  68. RefreshToken string `json:"refresh_token"`
  69. ClientId string `json:"client_id"`
  70. Expires int64 `json:"expires"`
  71. }{
  72. Token: DefaultToken,
  73. RefreshToken: RefreshToken,
  74. ClientId: "test",
  75. Expires: 36000,
  76. }
  77. jsonOut(w, out)
  78. }).Methods(http.MethodPost)
  79. router.HandleFunc("/refresh", func(w http.ResponseWriter, r *http.Request) {
  80. token := r.Header.Get("Authorization")
  81. if token != "Bearer "+DefaultToken {
  82. http.Error(w, "invalid token", http.StatusBadRequest)
  83. }
  84. rt := r.Header.Get("RefreshToken")
  85. if rt != RefreshToken {
  86. http.Error(w, "invalid refresh token", http.StatusBadRequest)
  87. }
  88. out := &struct {
  89. Token string `json:"token"`
  90. RefreshToken string `json:"refresh_token"`
  91. ClientId string `json:"client_id"`
  92. Expires int64 `json:"expires"`
  93. }{
  94. Token: DefaultToken,
  95. RefreshToken: RefreshToken,
  96. ClientId: "test",
  97. Expires: 36000,
  98. }
  99. jsonOut(w, out)
  100. }).Methods(http.MethodPost)
  101. router.HandleFunc("/data", func(w http.ResponseWriter, r *http.Request) {
  102. token := r.Header.Get("Authorization")
  103. if token != "Bearer "+DefaultToken {
  104. http.Error(w, "invalid token", http.StatusBadRequest)
  105. }
  106. out := &struct {
  107. DeviceId string `json:"device_id"`
  108. Temperature float64 `json:"temperature"`
  109. Humidity float64 `json:"humidity"`
  110. }{
  111. DeviceId: "device1",
  112. Temperature: 25.5,
  113. Humidity: 60.0,
  114. }
  115. jsonOut(w, out)
  116. }).Methods(http.MethodGet)
  117. // Return same data for 3 times
  118. router.HandleFunc("/data2", func(w http.ResponseWriter, r *http.Request) {
  119. out := &struct {
  120. Code int `json:"code"`
  121. Data struct {
  122. DeviceId string `json:"device_id"`
  123. Temperature float64 `json:"temperature"`
  124. Humidity float64 `json:"humidity"`
  125. } `json:"data"`
  126. }{
  127. Code: 200,
  128. Data: struct {
  129. DeviceId string `json:"device_id"`
  130. Temperature float64 `json:"temperature"`
  131. Humidity float64 `json:"humidity"`
  132. }{
  133. DeviceId: "device" + strconv.Itoa(i/3),
  134. Temperature: 25.5,
  135. Humidity: 60.0,
  136. },
  137. }
  138. i++
  139. jsonOut(w, out)
  140. }).Methods(http.MethodGet)
  141. router.HandleFunc("/data3", func(w http.ResponseWriter, r *http.Request) {
  142. out := []*struct {
  143. Code int `json:"code"`
  144. Data struct {
  145. DeviceId string `json:"device_id"`
  146. Temperature float64 `json:"temperature"`
  147. Humidity float64 `json:"humidity"`
  148. } `json:"data"`
  149. }{
  150. {
  151. Code: 200,
  152. Data: struct {
  153. DeviceId string `json:"device_id"`
  154. Temperature float64 `json:"temperature"`
  155. Humidity float64 `json:"humidity"`
  156. }{
  157. DeviceId: "d1",
  158. Temperature: 25.5,
  159. Humidity: 60.0,
  160. },
  161. },
  162. {
  163. Code: 200,
  164. Data: struct {
  165. DeviceId string `json:"device_id"`
  166. Temperature float64 `json:"temperature"`
  167. Humidity float64 `json:"humidity"`
  168. }{
  169. DeviceId: "d2",
  170. Temperature: 25.5,
  171. Humidity: 60.0,
  172. },
  173. },
  174. }
  175. jsonOut(w, out)
  176. }).Methods(http.MethodGet)
  177. // data4 receives time range in url
  178. router.HandleFunc("/data4", func(w http.ResponseWriter, r *http.Request) {
  179. device := r.URL.Query().Get("device")
  180. s := r.URL.Query().Get("start")
  181. e := r.URL.Query().Get("end")
  182. start, _ := strconv.ParseInt(s, 10, 64)
  183. end, _ := strconv.ParseInt(e, 10, 64)
  184. out := &struct {
  185. Code int `json:"code"`
  186. Data struct {
  187. DeviceId string `json:"device_id"`
  188. Temperature int64 `json:"temperature"`
  189. Humidity int64 `json:"humidity"`
  190. } `json:"data"`
  191. }{
  192. Code: 200,
  193. Data: struct {
  194. DeviceId string `json:"device_id"`
  195. Temperature int64 `json:"temperature"`
  196. Humidity int64 `json:"humidity"`
  197. }{
  198. DeviceId: device,
  199. Temperature: start % 50,
  200. Humidity: end % 100,
  201. },
  202. }
  203. jsonOut(w, out)
  204. }).Methods(http.MethodGet)
  205. // data5 receives time range in body
  206. router.HandleFunc("/data5", func(w http.ResponseWriter, r *http.Request) {
  207. body, err := io.ReadAll(r.Body)
  208. if err != nil {
  209. http.Error(w, "Failed to read request body", http.StatusBadRequest)
  210. return
  211. }
  212. // Create a Person struct to hold the JSON data
  213. var ddd struct {
  214. Device string `json:"device"`
  215. Start int64 `json:"start"`
  216. End int64 `json:"end"`
  217. }
  218. // Unmarshal the JSON data into the Person struct
  219. err = json.Unmarshal(body, &ddd)
  220. if err != nil {
  221. http.Error(w, "Failed to parse JSON", http.StatusBadRequest)
  222. return
  223. }
  224. out := &struct {
  225. Code int `json:"code"`
  226. Data struct {
  227. DeviceId string `json:"device_id"`
  228. Temperature int64 `json:"temperature"`
  229. Humidity int64 `json:"humidity"`
  230. } `json:"data"`
  231. }{
  232. Code: 200,
  233. Data: struct {
  234. DeviceId string `json:"device_id"`
  235. Temperature int64 `json:"temperature"`
  236. Humidity int64 `json:"humidity"`
  237. }{
  238. DeviceId: ddd.Device,
  239. Temperature: ddd.Start % 50,
  240. Humidity: ddd.End % 100,
  241. },
  242. }
  243. jsonOut(w, out)
  244. }).Methods(http.MethodPost)
  245. server := httptest.NewUnstartedServer(router)
  246. err := server.Listener.Close()
  247. if err != nil {
  248. panic(err)
  249. }
  250. server.Listener = l
  251. return server
  252. }
  253. var wrongPath, _ = filepath.Abs("/tmp/wrong")
  254. // Test configure to properties
  255. func TestConfigure(t *testing.T) {
  256. tests := []struct {
  257. name string
  258. props map[string]interface{}
  259. err error
  260. config *RawConf
  261. accessConf *AccessTokenConf
  262. refreshConf *RefreshTokenConf
  263. tokens map[string]interface{}
  264. }{
  265. {
  266. name: "default",
  267. props: map[string]interface{}{
  268. "incremental": true,
  269. "url": "http://localhost:9090/",
  270. },
  271. config: &RawConf{
  272. Incremental: true,
  273. Url: "http://localhost:9090/",
  274. Method: http.MethodGet,
  275. Interval: DefaultInterval,
  276. Timeout: DefaultTimeout,
  277. BodyType: "none",
  278. ResponseType: "code",
  279. InsecureSkipVerify: true,
  280. },
  281. },
  282. // Test wrong properties
  283. {
  284. name: "wrong props",
  285. props: map[string]interface{}{
  286. "incremental": true,
  287. "url": 123,
  288. },
  289. err: fmt.Errorf("fail to parse the properties: 1 error(s) decoding:\n\n* 'url' expected type 'string', got unconvertible type 'int', value: '123'"),
  290. },
  291. {
  292. name: "empty url",
  293. props: map[string]interface{}{
  294. "incremental": true,
  295. "url": "",
  296. },
  297. err: fmt.Errorf("url is required"),
  298. },
  299. {
  300. name: "wrong method",
  301. props: map[string]interface{}{
  302. "url": "http://localhost:9090/",
  303. "method": "wrong",
  304. },
  305. err: fmt.Errorf("Not supported HTTP method wrong."),
  306. },
  307. {
  308. name: "wrong bodytype",
  309. props: map[string]interface{}{
  310. "url": "http://localhost:9090/",
  311. "bodyType": "wrong",
  312. },
  313. err: fmt.Errorf("Not valid body type value wrong."),
  314. },
  315. {
  316. name: "wrong response type",
  317. props: map[string]interface{}{
  318. "url": "http://localhost:9090/",
  319. "responseType": "wrong",
  320. },
  321. err: fmt.Errorf("Not valid response type value wrong."),
  322. },
  323. {
  324. name: "wrong url",
  325. props: map[string]interface{}{
  326. "url": "http:/localhost:9090/",
  327. },
  328. err: fmt.Errorf("Invalid url, host not found"),
  329. },
  330. {
  331. name: "wrong interval",
  332. props: map[string]interface{}{
  333. "url": "http:/localhost:9090/",
  334. "interval": -2,
  335. },
  336. err: fmt.Errorf("interval must be greater than 0"),
  337. },
  338. {
  339. name: "wrong timeout",
  340. props: map[string]interface{}{
  341. "url": "http:/localhost:9090/",
  342. "timeout": -2,
  343. },
  344. err: fmt.Errorf("timeout must be greater than or equal to 0"),
  345. },
  346. {
  347. name: "wrong tls",
  348. props: map[string]interface{}{
  349. "url": "http://localhost:9090/",
  350. "certificationPath": wrongPath,
  351. },
  352. err: fmt.Errorf(fmt.Sprintf("stat %s: no such file or directory", wrongPath)),
  353. },
  354. // Test oAuth
  355. {
  356. name: "oAuth with access token and constant expire",
  357. props: map[string]interface{}{
  358. "url": "http://localhost:52345/",
  359. "headers": map[string]interface{}{
  360. "Authorization": "Bearer {{.token}}",
  361. },
  362. "oAuth": map[string]interface{}{
  363. "access": map[string]interface{}{
  364. "url": "http://localhost:52345/token",
  365. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  366. "expire": "3600",
  367. },
  368. },
  369. },
  370. config: &RawConf{
  371. Url: "http://localhost:52345/",
  372. Method: http.MethodGet,
  373. Interval: DefaultInterval,
  374. Timeout: DefaultTimeout,
  375. BodyType: "none",
  376. ResponseType: "code",
  377. InsecureSkipVerify: true,
  378. Headers: map[string]interface{}{
  379. "Authorization": "Bearer {{.token}}",
  380. },
  381. HeadersMap: map[string]string{
  382. "Authorization": "Bearer {{.token}}",
  383. },
  384. OAuth: map[string]map[string]interface{}{
  385. "access": {
  386. "url": "http://localhost:52345/token",
  387. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  388. "expire": "3600",
  389. },
  390. },
  391. },
  392. accessConf: &AccessTokenConf{
  393. Url: "http://localhost:52345/token",
  394. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  395. Expire: "3600",
  396. ExpireInSecond: 3600,
  397. },
  398. tokens: map[string]interface{}{
  399. "token": DefaultToken,
  400. "refresh_token": RefreshToken,
  401. "client_id": "test",
  402. "expires": float64(36000),
  403. },
  404. },
  405. {
  406. name: "oAuth with access token and dynamic expire",
  407. props: map[string]interface{}{
  408. "url": "http://localhost:52345/",
  409. "headers": map[string]interface{}{
  410. "Authorization": "Bearer {{.token}}",
  411. },
  412. "oAuth": map[string]interface{}{
  413. "access": map[string]interface{}{
  414. "url": "http://localhost:52345/token",
  415. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  416. "expire": "{{.expires}}",
  417. },
  418. },
  419. },
  420. config: &RawConf{
  421. Url: "http://localhost:52345/",
  422. Method: http.MethodGet,
  423. Interval: DefaultInterval,
  424. Timeout: DefaultTimeout,
  425. BodyType: "none",
  426. ResponseType: "code",
  427. InsecureSkipVerify: true,
  428. Headers: map[string]interface{}{
  429. "Authorization": "Bearer {{.token}}",
  430. },
  431. HeadersMap: map[string]string{
  432. "Authorization": "Bearer {{.token}}",
  433. },
  434. OAuth: map[string]map[string]interface{}{
  435. "access": {
  436. "url": "http://localhost:52345/token",
  437. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  438. "expire": "{{.expires}}",
  439. },
  440. },
  441. },
  442. accessConf: &AccessTokenConf{
  443. Url: "http://localhost:52345/token",
  444. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  445. Expire: "{{.expires}}",
  446. ExpireInSecond: 36000,
  447. },
  448. tokens: map[string]interface{}{
  449. "token": DefaultToken,
  450. "refresh_token": RefreshToken,
  451. "client_id": "test",
  452. "expires": float64(36000),
  453. },
  454. },
  455. {
  456. name: "oAuth with access token and refresh token",
  457. props: map[string]interface{}{
  458. "url": "http://localhost:52345/",
  459. "headers": map[string]interface{}{
  460. "Authorization": "Bearer {{.token}}",
  461. },
  462. "oAuth": map[string]interface{}{
  463. "access": map[string]interface{}{
  464. "url": "http://localhost:52345/token",
  465. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  466. "expire": "3600",
  467. },
  468. "refresh": map[string]interface{}{
  469. "url": "http://localhost:52345/refresh",
  470. "headers": map[string]interface{}{
  471. "Authorization": "Bearer {{.token}}",
  472. "RefreshToken": "{{.refresh_token}}",
  473. },
  474. },
  475. },
  476. },
  477. config: &RawConf{
  478. Url: "http://localhost:52345/",
  479. Method: http.MethodGet,
  480. Interval: DefaultInterval,
  481. Timeout: DefaultTimeout,
  482. BodyType: "none",
  483. ResponseType: "code",
  484. InsecureSkipVerify: true,
  485. Headers: map[string]interface{}{
  486. "Authorization": "Bearer {{.token}}",
  487. },
  488. HeadersMap: map[string]string{
  489. "Authorization": "Bearer {{.token}}",
  490. },
  491. OAuth: map[string]map[string]interface{}{
  492. "access": {
  493. "url": "http://localhost:52345/token",
  494. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  495. "expire": "3600",
  496. },
  497. "refresh": {
  498. "url": "http://localhost:52345/refresh",
  499. "headers": map[string]interface{}{
  500. "Authorization": "Bearer {{.token}}",
  501. "RefreshToken": "{{.refresh_token}}",
  502. },
  503. },
  504. },
  505. },
  506. accessConf: &AccessTokenConf{
  507. Url: "http://localhost:52345/token",
  508. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  509. Expire: "3600",
  510. ExpireInSecond: 3600,
  511. },
  512. refreshConf: &RefreshTokenConf{
  513. Url: "http://localhost:52345/refresh",
  514. Headers: map[string]string{
  515. "Authorization": "Bearer {{.token}}",
  516. "RefreshToken": "{{.refresh_token}}",
  517. },
  518. },
  519. tokens: map[string]interface{}{
  520. "token": DefaultToken,
  521. "refresh_token": RefreshToken,
  522. "client_id": "test",
  523. "expires": float64(36000),
  524. },
  525. },
  526. // Wrong auth configs
  527. {
  528. name: "oAuth wrong access token config",
  529. props: map[string]interface{}{
  530. "url": "http://localhost:52345/",
  531. "headers": map[string]interface{}{
  532. "Authorization": "Bearer {{.token}}",
  533. },
  534. "oAuth": map[string]interface{}{
  535. "access": map[string]interface{}{
  536. "url": "http://localhost:52345/token",
  537. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  538. "expire": 3600,
  539. },
  540. },
  541. },
  542. err: errors.New("fail to parse the access properties of oAuth: 1 error(s) decoding:\n\n* 'expire' expected type 'string', got unconvertible type 'int', value: '3600'"),
  543. },
  544. {
  545. name: "oAuth wrong access url",
  546. props: map[string]interface{}{
  547. "url": "http://localhost:52345/",
  548. "headers": map[string]interface{}{
  549. "Authorization": "Bearer {{.token}}",
  550. },
  551. "oAuth": map[string]interface{}{
  552. "access": map[string]interface{}{
  553. "url": "",
  554. },
  555. },
  556. },
  557. config: &RawConf{
  558. Url: "http://localhost:52345/",
  559. Method: http.MethodGet,
  560. Interval: DefaultInterval,
  561. Timeout: DefaultTimeout,
  562. BodyType: "none",
  563. ResponseType: "code",
  564. InsecureSkipVerify: true,
  565. Headers: map[string]interface{}{
  566. "Authorization": "Bearer {{.token}}",
  567. },
  568. HeadersMap: map[string]string{
  569. "Authorization": "Bearer {{.token}}",
  570. },
  571. },
  572. },
  573. {
  574. name: "oAuth miss access",
  575. props: map[string]interface{}{
  576. "url": "http://localhost:52345/",
  577. "headers": map[string]interface{}{
  578. "Authorization": "Bearer {{.token}}",
  579. },
  580. "oAuth": map[string]interface{}{
  581. "refresh": map[string]interface{}{
  582. "url": "http://localhost:52345/",
  583. },
  584. },
  585. },
  586. err: errors.New("if setting oAuth, `access` property is required"),
  587. },
  588. {
  589. name: "oAuth wrong refresh token config",
  590. props: map[string]interface{}{
  591. "url": "http://localhost:52345/",
  592. "headers": map[string]interface{}{
  593. "Authorization": "Bearer {{.token}}",
  594. },
  595. "oAuth": map[string]interface{}{
  596. "access": map[string]interface{}{
  597. "url": "http://localhost:52345/token",
  598. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  599. "expire": "3600",
  600. },
  601. "refresh": map[string]interface{}{
  602. "url": 1202,
  603. },
  604. },
  605. },
  606. accessConf: &AccessTokenConf{
  607. Url: "http://localhost:52345/token",
  608. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  609. Expire: "3600",
  610. ExpireInSecond: 3600,
  611. },
  612. err: errors.New("fail to parse the refresh token properties: 1 error(s) decoding:\n\n* 'url' expected type 'string', got unconvertible type 'int', value: '1202'"),
  613. },
  614. {
  615. name: "oAuth refresh token missing url",
  616. props: map[string]interface{}{
  617. "url": "http://localhost:52345/",
  618. "headers": map[string]interface{}{
  619. "Authorization": "Bearer {{.token}}",
  620. },
  621. "oAuth": map[string]interface{}{
  622. "access": map[string]interface{}{
  623. "url": "http://localhost:52345/token",
  624. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  625. "expire": "3600",
  626. },
  627. },
  628. },
  629. config: &RawConf{
  630. Url: "http://localhost:52345/",
  631. Method: http.MethodGet,
  632. Interval: DefaultInterval,
  633. Timeout: DefaultTimeout,
  634. BodyType: "none",
  635. ResponseType: "code",
  636. InsecureSkipVerify: true,
  637. Headers: map[string]interface{}{
  638. "Authorization": "Bearer {{.token}}",
  639. },
  640. HeadersMap: map[string]string{
  641. "Authorization": "Bearer {{.token}}",
  642. },
  643. OAuth: map[string]map[string]interface{}{
  644. "access": {
  645. "url": "http://localhost:52345/token",
  646. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  647. "expire": "3600",
  648. },
  649. },
  650. },
  651. accessConf: &AccessTokenConf{
  652. Url: "http://localhost:52345/token",
  653. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  654. Expire: "3600",
  655. ExpireInSecond: 3600,
  656. },
  657. tokens: map[string]interface{}{
  658. "token": DefaultToken,
  659. "refresh_token": RefreshToken,
  660. "client_id": "test",
  661. "expires": float64(36000),
  662. },
  663. },
  664. // oAuth authentication flow errors
  665. {
  666. name: "oAuth auth error",
  667. props: map[string]interface{}{
  668. "url": "http://localhost:52345/",
  669. "headers": map[string]interface{}{
  670. "Authorization": "Bearer {{.token}}",
  671. },
  672. "oAuth": map[string]interface{}{
  673. "access": map[string]interface{}{
  674. "url": "http://localhost:52345/token",
  675. "body": "{\"username\": \"admin\",\"password\": \"1234\"}",
  676. "expire": "3600",
  677. },
  678. },
  679. },
  680. config: &RawConf{
  681. Url: "http://localhost:52345/",
  682. Method: http.MethodGet,
  683. Interval: DefaultInterval,
  684. Timeout: DefaultTimeout,
  685. BodyType: "json",
  686. ResponseType: "code",
  687. InsecureSkipVerify: true,
  688. Headers: map[string]string{
  689. "Authorization": "Bearer {{.token}}",
  690. },
  691. OAuth: map[string]map[string]interface{}{
  692. "access": {
  693. "url": "http://localhost:52345/token",
  694. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  695. "expire": "3600",
  696. },
  697. },
  698. },
  699. accessConf: &AccessTokenConf{
  700. Url: "http://localhost:52345/token",
  701. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  702. Expire: "3600",
  703. ExpireInSecond: 3600,
  704. },
  705. tokens: map[string]interface{}{
  706. "token": DefaultToken,
  707. "refresh_token": RefreshToken,
  708. "client_id": "test",
  709. "expires": float64(36000),
  710. },
  711. err: errors.New("fail to authorize by oAuth: Cannot parse access token response to json: http return code error: 400"),
  712. },
  713. {
  714. name: "oAuth refresh error",
  715. props: map[string]interface{}{
  716. "url": "http://localhost:52345/",
  717. "headers": map[string]interface{}{
  718. "Authorization": "Bearer {{.token}}",
  719. },
  720. "oAuth": map[string]interface{}{
  721. "access": map[string]interface{}{
  722. "url": "http://localhost:52345/token",
  723. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  724. "expire": "3600",
  725. },
  726. "refresh": map[string]interface{}{
  727. "url": "http://localhost:52345/refresh",
  728. "headers": map[string]interface{}{
  729. "Authorization": "Bearer {{.token}}",
  730. "RefreshToken": "{{.token}}",
  731. },
  732. },
  733. },
  734. },
  735. err: errors.New("fail to authorize by oAuth: Cannot parse refresh token response to json: http return code error: 400"),
  736. },
  737. {
  738. name: "oAuth wrong access expire template",
  739. props: map[string]interface{}{
  740. "url": "http://localhost:52345/",
  741. "headers": map[string]interface{}{
  742. "Authorization": "Bearer {{.token}}",
  743. },
  744. "oAuth": map[string]interface{}{
  745. "access": map[string]interface{}{
  746. "url": "http://localhost:52345/token",
  747. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  748. "expire": "{{..expp}}",
  749. },
  750. },
  751. },
  752. err: errors.New("fail to authorize by oAuth: fail to parse the expire time for access token: template: sink:1: unexpected . after term \".\""),
  753. },
  754. {
  755. name: "oAuth wrong access expire type",
  756. props: map[string]interface{}{
  757. "url": "http://localhost:52345/",
  758. "headers": map[string]interface{}{
  759. "Authorization": "Bearer {{.token}}",
  760. },
  761. "oAuth": map[string]interface{}{
  762. "access": map[string]interface{}{
  763. "url": "http://localhost:52345/token",
  764. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  765. "expire": "{{.token}}",
  766. },
  767. },
  768. },
  769. err: errors.New("fail to authorize by oAuth: fail to covert the expire time privatisation for access token: cannot convert string(privatisation) to int"),
  770. },
  771. {
  772. name: "oAuth wrong access url",
  773. props: map[string]interface{}{
  774. "url": "http://localhost:52345/",
  775. "headers": map[string]interface{}{
  776. "Authorization": "Bearer {{.token}}",
  777. },
  778. "oAuth": map[string]interface{}{
  779. "access": map[string]interface{}{
  780. "url": "http:localhost:52345/token",
  781. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  782. "expire": "{{.token}}",
  783. },
  784. },
  785. },
  786. err: errors.New("fail to authorize by oAuth: fail to get access token: Post \"http:localhost:52345/token\": http: no Host in request URL"),
  787. },
  788. {
  789. name: "oAuth wrong refresh header template",
  790. props: map[string]interface{}{
  791. "url": "http://localhost:52345/",
  792. "headers": map[string]interface{}{
  793. "Authorization": "Bearer {{.token}}",
  794. },
  795. "oAuth": map[string]interface{}{
  796. "access": map[string]interface{}{
  797. "url": "http://localhost:52345/token",
  798. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  799. "expire": "3600",
  800. },
  801. "refresh": map[string]interface{}{
  802. "url": "http://localhost:52345/refresh",
  803. "headers": map[string]interface{}{
  804. "Authorization": "Bearer {{.token}}",
  805. "RefreshToken": "{{..token}}",
  806. },
  807. },
  808. },
  809. },
  810. err: errors.New("fail to authorize by oAuth: fail to parse the header for refresh token request RefreshToken: template: sink:1: unexpected . after term \".\""),
  811. },
  812. {
  813. name: "oAuth wrong refresh url",
  814. props: map[string]interface{}{
  815. "url": "http://localhost:52345/",
  816. "headers": map[string]interface{}{
  817. "Authorization": "Bearer {{.token}}",
  818. },
  819. "oAuth": map[string]interface{}{
  820. "access": map[string]interface{}{
  821. "url": "http://localhost:52345/token",
  822. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  823. "expire": "3600",
  824. },
  825. "refresh": map[string]interface{}{
  826. "url": "http:localhost:52345/refresh2",
  827. "headers": map[string]interface{}{
  828. "Authorization": "Bearer {{.token}}",
  829. "RefreshToken": "{{.token}}",
  830. },
  831. },
  832. },
  833. },
  834. err: errors.New("fail to authorize by oAuth: fail to get refresh token: Post \"http:localhost:52345/refresh2\": http: no Host in request URL"),
  835. },
  836. }
  837. server := mockAuthServer()
  838. server.Start()
  839. defer server.Close()
  840. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  841. for i, tt := range tests {
  842. t.Run(fmt.Sprintf("Test %d: %s", i, tt.name), func(t *testing.T) {
  843. r := &PullSource{}
  844. err := r.Configure("", tt.props)
  845. if err != nil {
  846. if tt.err == nil {
  847. t.Errorf("Expected error: %v", err)
  848. } else {
  849. if err.Error() != tt.err.Error() {
  850. t.Errorf("Error mismatch\nexp\t%v\ngot\t%v", tt.err, err)
  851. }
  852. }
  853. return
  854. }
  855. if !reflect.DeepEqual(r.config, tt.config) {
  856. t.Errorf("Config mismatch\nexp\t%+v\ngot\t%+v", tt.config, r.config)
  857. }
  858. if !reflect.DeepEqual(r.accessConf, tt.accessConf) {
  859. t.Errorf("AccessConf mismatch\nexp\t%+v\ngot\t%+v", tt.accessConf, r.accessConf)
  860. }
  861. if !reflect.DeepEqual(r.refreshConf, tt.refreshConf) {
  862. t.Errorf("RefreshConf mismatch\nexp\t%+v\ngot\t%+v", tt.refreshConf, r.refreshConf)
  863. }
  864. if !reflect.DeepEqual(r.tokens, tt.tokens) {
  865. t.Errorf("Tokens mismatch\nexp\t%s\ngot\t%s", tt.tokens, r.tokens)
  866. }
  867. })
  868. }
  869. }
  870. func TestPullWithAuth(t *testing.T) {
  871. conf.IsTesting = false
  872. conf.InitClock()
  873. r := &PullSource{}
  874. server := mockAuthServer()
  875. server.Start()
  876. defer server.Close()
  877. err := r.Configure("data", map[string]interface{}{
  878. "url": "http://localhost:52345/",
  879. "interval": 100,
  880. "headers": map[string]interface{}{
  881. "Authorization": "Bearer {{.token}}",
  882. },
  883. "oAuth": map[string]interface{}{
  884. "access": map[string]interface{}{
  885. "url": "http://localhost:52345/token",
  886. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  887. "expire": "10",
  888. },
  889. "refresh": map[string]interface{}{
  890. "url": "http://localhost:52345/refresh",
  891. "headers": map[string]interface{}{
  892. "Authorization": "Bearer {{.token}}",
  893. "RefreshToken": "{{.refresh_token}}",
  894. },
  895. },
  896. },
  897. })
  898. if err != nil {
  899. t.Errorf(err.Error())
  900. return
  901. }
  902. mc := conf.Clock
  903. exp := []api.SourceTuple{
  904. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}, map[string]interface{}{}, mc.Now()),
  905. }
  906. mock.TestSourceOpen(r, exp, t)
  907. }
  908. func TestPullIncremental(t *testing.T) {
  909. conf.IsTesting = false
  910. conf.InitClock()
  911. r := &PullSource{}
  912. server := mockAuthServer()
  913. server.Start()
  914. defer server.Close()
  915. err := r.Configure("data2", map[string]interface{}{
  916. "url": "http://localhost:52345/",
  917. "interval": 100,
  918. "incremental": true,
  919. "responseType": "body",
  920. })
  921. if err != nil {
  922. t.Errorf(err.Error())
  923. return
  924. }
  925. mc := conf.Clock
  926. exp := []api.SourceTuple{
  927. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device0", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
  928. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
  929. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device2", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
  930. }
  931. mock.TestSourceOpen(r, exp, t)
  932. }
  933. func TestPullJsonList(t *testing.T) {
  934. conf.IsTesting = false
  935. conf.InitClock()
  936. r := &PullSource{}
  937. server := mockAuthServer()
  938. server.Start()
  939. defer server.Close()
  940. err := r.Configure("data3", map[string]interface{}{
  941. "url": "http://localhost:52345/",
  942. "interval": 100,
  943. "responseType": "body",
  944. })
  945. if err != nil {
  946. t.Errorf(err.Error())
  947. return
  948. }
  949. mc := conf.Clock
  950. exp := []api.SourceTuple{
  951. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
  952. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d2", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
  953. }
  954. mock.TestSourceOpen(r, exp, t)
  955. }
  956. func TestPullUrlTimeRange(t *testing.T) {
  957. r := &PullSource{}
  958. server := mockAuthServer()
  959. server.Start()
  960. defer server.Close()
  961. err := r.Configure("", map[string]interface{}{
  962. "url": "http://localhost:52345/data4?device=d1&start={{.LastPullTime}}&end={{.PullTime}}",
  963. "interval": 110,
  964. "responseType": "body",
  965. })
  966. if err != nil {
  967. t.Errorf(err.Error())
  968. return
  969. }
  970. // Mock time
  971. mockclock.ResetClock(143)
  972. exp := []api.SourceTuple{
  973. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": float64(53), "temperature": float64(43)}}, map[string]interface{}{}, time.UnixMilli(253)),
  974. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": float64(63), "temperature": float64(3)}}, map[string]interface{}{}, time.UnixMilli(363)),
  975. }
  976. c := mockclock.GetMockClock()
  977. go func() {
  978. time.Sleep(10 * time.Millisecond)
  979. c.Add(350 * time.Millisecond)
  980. }()
  981. mock.TestSourceOpen(r, exp, t)
  982. }
  983. func TestPullBodyTimeRange(t *testing.T) {
  984. r := &PullSource{}
  985. server := mockAuthServer()
  986. server.Start()
  987. defer server.Close()
  988. err := r.Configure("data5", map[string]interface{}{
  989. "url": "http://localhost:52345/",
  990. "interval": 110,
  991. "responseType": "body",
  992. "method": "POST",
  993. "body": `{"device": "d1", "start": {{.LastPullTime}}, "end": {{.PullTime}}}`,
  994. })
  995. if err != nil {
  996. t.Errorf(err.Error())
  997. return
  998. }
  999. // Mock time
  1000. mockclock.ResetClock(143)
  1001. exp := []api.SourceTuple{
  1002. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": float64(53), "temperature": float64(43)}}, map[string]interface{}{}, time.UnixMilli(253)),
  1003. api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": float64(63), "temperature": float64(3)}}, map[string]interface{}{}, time.UnixMilli(363)),
  1004. }
  1005. c := mockclock.GetMockClock()
  1006. go func() {
  1007. time.Sleep(10 * time.Millisecond)
  1008. c.Add(350 * time.Millisecond)
  1009. }()
  1010. mock.TestSourceOpen(r, exp, t)
  1011. }
  1012. func TestPullErrorTest(t *testing.T) {
  1013. conf.IsTesting = false
  1014. conf.InitClock()
  1015. tests := []struct {
  1016. name string
  1017. conf map[string]interface{}
  1018. exp []api.SourceTuple
  1019. }{
  1020. {
  1021. name: "wrong url template",
  1022. conf: map[string]interface{}{"url": "http://localhost:52345/data4?device=d1&start={{.lastPullTime}}&end={{.PullTime}", "interval": 10},
  1023. exp: []api.SourceTuple{
  1024. &xsql.ErrorSourceTuple{
  1025. Error: errors.New("parse url http://localhost:52345/data4?device=d1&start={{.lastPullTime}}&end={{.PullTime} error template: sink:1: bad character U+007D '}'"),
  1026. },
  1027. },
  1028. }, {
  1029. name: "wrong header template",
  1030. conf: map[string]interface{}{"url": "http://localhost:52345/data4", "interval": 10, "HeadersTemplate": "\"Authorization\": \"Bearer {{.aatoken}}"},
  1031. exp: []api.SourceTuple{
  1032. &xsql.ErrorSourceTuple{
  1033. Error: errors.New("parse headers error parsed header template is not json: \"Authorization\": \"Bearer <no value>"),
  1034. },
  1035. },
  1036. }, {
  1037. name: "wrong body template",
  1038. conf: map[string]interface{}{"url": "http://localhost:52345/data4", "interval": 10, "body": `{"device": "d1", "start": {{.LastPullTime}}, "end": {{.pullTime}}}`},
  1039. exp: []api.SourceTuple{
  1040. &xsql.ErrorSourceTuple{
  1041. Error: errors.New("parse body {\"device\": \"d1\", \"start\": {{.LastPullTime}}, \"end\": {{.pullTime}}} error template: sink:1:54: executing \"sink\" at <.pullTime>: can't evaluate field pullTime in type *http.pullTimeMeta"),
  1042. },
  1043. },
  1044. }, {
  1045. name: "wrong response",
  1046. conf: map[string]interface{}{"url": "http://localhost:52345/aa/data4", "interval": 10},
  1047. exp: []api.SourceTuple{
  1048. &xsql.ErrorSourceTuple{
  1049. Error: errors.New("parse response error http return code error: 404"),
  1050. },
  1051. },
  1052. }, {
  1053. name: "wrong request",
  1054. conf: map[string]interface{}{"url": "http://localhost:52345/aa/data4", "interval": 10, "bodyType": "form", "body": "ddd"},
  1055. exp: []api.SourceTuple{
  1056. &xsql.ErrorSourceTuple{
  1057. Error: errors.New("send request error invalid content: ddd"),
  1058. },
  1059. },
  1060. },
  1061. }
  1062. server := mockAuthServer()
  1063. server.Start()
  1064. defer server.Close()
  1065. for _, test := range tests {
  1066. t.Run(test.name, func(t *testing.T) {
  1067. r := &PullSource{}
  1068. err := r.Configure("", test.conf)
  1069. assert.NoError(t, err)
  1070. mock.TestSourceOpen(r, test.exp, t)
  1071. })
  1072. }
  1073. }