12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100 |
- // Copyright 2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package http
- import (
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "net"
- "net/http"
- "net/http/httptest"
- "path/filepath"
- "reflect"
- "strconv"
- "testing"
- "time"
- "github.com/gorilla/mux"
- "github.com/stretchr/testify/assert"
- "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/lf-edge/ekuiper/internal/io/mock"
- "github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
- "github.com/lf-edge/ekuiper/internal/xsql"
- "github.com/lf-edge/ekuiper/pkg/api"
- )
- func jsonOut(w http.ResponseWriter, out interface{}) {
- w.Header().Add("Content-Type", "application/json")
- enc := json.NewEncoder(w)
- err := enc.Encode(out)
- // Problems encoding
- if err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
- }
- }
- const (
- DefaultToken = "privatisation"
- RefreshToken = "privaterefresh"
- )
- // mock http auth server
- func mockAuthServer() *httptest.Server {
- l, _ := net.Listen("tcp", "127.0.0.1:52345")
- router := mux.NewRouter()
- i := 0
- router.HandleFunc("/token", func(w http.ResponseWriter, r *http.Request) {
- body := &struct {
- Username string `json:"username"`
- Password string `json:"password"`
- }{}
- err := json.NewDecoder(r.Body).Decode(body)
- if err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
- }
- if body.Username != "admin" || body.Password != "0000" {
- http.Error(w, "invalid username or password", http.StatusBadRequest)
- }
- out := &struct {
- Token string `json:"token"`
- RefreshToken string `json:"refresh_token"`
- ClientId string `json:"client_id"`
- Expires int64 `json:"expires"`
- }{
- Token: DefaultToken,
- RefreshToken: RefreshToken,
- ClientId: "test",
- Expires: 36000,
- }
- jsonOut(w, out)
- }).Methods(http.MethodPost)
- router.HandleFunc("/refresh", func(w http.ResponseWriter, r *http.Request) {
- token := r.Header.Get("Authorization")
- if token != "Bearer "+DefaultToken {
- http.Error(w, "invalid token", http.StatusBadRequest)
- }
- rt := r.Header.Get("RefreshToken")
- if rt != RefreshToken {
- http.Error(w, "invalid refresh token", http.StatusBadRequest)
- }
- out := &struct {
- Token string `json:"token"`
- RefreshToken string `json:"refresh_token"`
- ClientId string `json:"client_id"`
- Expires int64 `json:"expires"`
- }{
- Token: DefaultToken,
- RefreshToken: RefreshToken,
- ClientId: "test",
- Expires: 36000,
- }
- jsonOut(w, out)
- }).Methods(http.MethodPost)
- router.HandleFunc("/data", func(w http.ResponseWriter, r *http.Request) {
- token := r.Header.Get("Authorization")
- if token != "Bearer "+DefaultToken {
- http.Error(w, "invalid token", http.StatusBadRequest)
- }
- out := &struct {
- DeviceId string `json:"device_id"`
- Temperature float64 `json:"temperature"`
- Humidity float64 `json:"humidity"`
- }{
- DeviceId: "device1",
- Temperature: 25.5,
- Humidity: 60.0,
- }
- jsonOut(w, out)
- }).Methods(http.MethodGet)
- // Return same data for 3 times
- router.HandleFunc("/data2", func(w http.ResponseWriter, r *http.Request) {
- out := &struct {
- Code int `json:"code"`
- Data struct {
- DeviceId string `json:"device_id"`
- Temperature float64 `json:"temperature"`
- Humidity float64 `json:"humidity"`
- } `json:"data"`
- }{
- Code: 200,
- Data: struct {
- DeviceId string `json:"device_id"`
- Temperature float64 `json:"temperature"`
- Humidity float64 `json:"humidity"`
- }{
- DeviceId: "device" + strconv.Itoa(i/3),
- Temperature: 25.5,
- Humidity: 60.0,
- },
- }
- i++
- jsonOut(w, out)
- }).Methods(http.MethodGet)
- router.HandleFunc("/data3", func(w http.ResponseWriter, r *http.Request) {
- out := []*struct {
- Code int `json:"code"`
- Data struct {
- DeviceId string `json:"device_id"`
- Temperature float64 `json:"temperature"`
- Humidity float64 `json:"humidity"`
- } `json:"data"`
- }{
- {
- Code: 200,
- Data: struct {
- DeviceId string `json:"device_id"`
- Temperature float64 `json:"temperature"`
- Humidity float64 `json:"humidity"`
- }{
- DeviceId: "d1",
- Temperature: 25.5,
- Humidity: 60.0,
- },
- },
- {
- Code: 200,
- Data: struct {
- DeviceId string `json:"device_id"`
- Temperature float64 `json:"temperature"`
- Humidity float64 `json:"humidity"`
- }{
- DeviceId: "d2",
- Temperature: 25.5,
- Humidity: 60.0,
- },
- },
- }
- jsonOut(w, out)
- }).Methods(http.MethodGet)
- // data4 receives time range in url
- router.HandleFunc("/data4", func(w http.ResponseWriter, r *http.Request) {
- device := r.URL.Query().Get("device")
- s := r.URL.Query().Get("start")
- e := r.URL.Query().Get("end")
- start, _ := strconv.ParseInt(s, 10, 64)
- end, _ := strconv.ParseInt(e, 10, 64)
- out := &struct {
- Code int `json:"code"`
- Data struct {
- DeviceId string `json:"device_id"`
- Temperature int64 `json:"temperature"`
- Humidity int64 `json:"humidity"`
- } `json:"data"`
- }{
- Code: 200,
- Data: struct {
- DeviceId string `json:"device_id"`
- Temperature int64 `json:"temperature"`
- Humidity int64 `json:"humidity"`
- }{
- DeviceId: device,
- Temperature: start % 50,
- Humidity: end % 100,
- },
- }
- jsonOut(w, out)
- }).Methods(http.MethodGet)
- // data5 receives time range in body
- router.HandleFunc("/data5", func(w http.ResponseWriter, r *http.Request) {
- body, err := io.ReadAll(r.Body)
- if err != nil {
- http.Error(w, "Failed to read request body", http.StatusBadRequest)
- return
- }
- // Create a Person struct to hold the JSON data
- var ddd struct {
- Device string `json:"device"`
- Start int64 `json:"start"`
- End int64 `json:"end"`
- }
- // Unmarshal the JSON data into the Person struct
- err = json.Unmarshal(body, &ddd)
- if err != nil {
- http.Error(w, "Failed to parse JSON", http.StatusBadRequest)
- return
- }
- out := &struct {
- Code int `json:"code"`
- Data struct {
- DeviceId string `json:"device_id"`
- Temperature int64 `json:"temperature"`
- Humidity int64 `json:"humidity"`
- } `json:"data"`
- }{
- Code: 200,
- Data: struct {
- DeviceId string `json:"device_id"`
- Temperature int64 `json:"temperature"`
- Humidity int64 `json:"humidity"`
- }{
- DeviceId: ddd.Device,
- Temperature: ddd.Start % 50,
- Humidity: ddd.End % 100,
- },
- }
- jsonOut(w, out)
- }).Methods(http.MethodPost)
- server := httptest.NewUnstartedServer(router)
- err := server.Listener.Close()
- if err != nil {
- panic(err)
- }
- server.Listener = l
- return server
- }
- var wrongPath, _ = filepath.Abs("/tmp/wrong")
- // Test configure to properties
- func TestConfigure(t *testing.T) {
- tests := []struct {
- name string
- props map[string]interface{}
- err error
- config *RawConf
- accessConf *AccessTokenConf
- refreshConf *RefreshTokenConf
- tokens map[string]interface{}
- }{
- {
- name: "default",
- props: map[string]interface{}{
- "incremental": true,
- "url": "http://localhost:9090/",
- },
- config: &RawConf{
- Incremental: true,
- Url: "http://localhost:9090/",
- Method: http.MethodGet,
- Interval: DefaultInterval,
- Timeout: DefaultTimeout,
- BodyType: "none",
- ResponseType: "code",
- InsecureSkipVerify: true,
- },
- },
- // Test wrong properties
- {
- name: "wrong props",
- props: map[string]interface{}{
- "incremental": true,
- "url": 123,
- },
- err: fmt.Errorf("fail to parse the properties: 1 error(s) decoding:\n\n* 'url' expected type 'string', got unconvertible type 'int', value: '123'"),
- },
- {
- name: "empty url",
- props: map[string]interface{}{
- "incremental": true,
- "url": "",
- },
- err: fmt.Errorf("url is required"),
- },
- {
- name: "wrong method",
- props: map[string]interface{}{
- "url": "http://localhost:9090/",
- "method": "wrong",
- },
- err: fmt.Errorf("Not supported HTTP method wrong."),
- },
- {
- name: "wrong bodytype",
- props: map[string]interface{}{
- "url": "http://localhost:9090/",
- "bodyType": "wrong",
- },
- err: fmt.Errorf("Not valid body type value wrong."),
- },
- {
- name: "wrong response type",
- props: map[string]interface{}{
- "url": "http://localhost:9090/",
- "responseType": "wrong",
- },
- err: fmt.Errorf("Not valid response type value wrong."),
- },
- {
- name: "wrong url",
- props: map[string]interface{}{
- "url": "http:/localhost:9090/",
- },
- err: fmt.Errorf("Invalid url, host not found"),
- },
- {
- name: "wrong interval",
- props: map[string]interface{}{
- "url": "http:/localhost:9090/",
- "interval": -2,
- },
- err: fmt.Errorf("interval must be greater than 0"),
- },
- {
- name: "wrong timeout",
- props: map[string]interface{}{
- "url": "http:/localhost:9090/",
- "timeout": -2,
- },
- err: fmt.Errorf("timeout must be greater than or equal to 0"),
- },
- {
- name: "wrong tls",
- props: map[string]interface{}{
- "url": "http://localhost:9090/",
- "certificationPath": wrongPath,
- },
- err: fmt.Errorf(fmt.Sprintf("stat %s: no such file or directory", wrongPath)),
- },
- // Test oAuth
- {
- name: "oAuth with access token and constant expire",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- },
- },
- config: &RawConf{
- Url: "http://localhost:52345/",
- Method: http.MethodGet,
- Interval: DefaultInterval,
- Timeout: DefaultTimeout,
- BodyType: "none",
- ResponseType: "code",
- InsecureSkipVerify: true,
- Headers: map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- HeadersMap: map[string]string{
- "Authorization": "Bearer {{.token}}",
- },
- OAuth: map[string]map[string]interface{}{
- "access": {
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- },
- },
- accessConf: &AccessTokenConf{
- Url: "http://localhost:52345/token",
- Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
- Expire: "3600",
- ExpireInSecond: 3600,
- },
- tokens: map[string]interface{}{
- "token": DefaultToken,
- "refresh_token": RefreshToken,
- "client_id": "test",
- "expires": float64(36000),
- },
- },
- {
- name: "oAuth with access token and dynamic expire",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "{{.expires}}",
- },
- },
- },
- config: &RawConf{
- Url: "http://localhost:52345/",
- Method: http.MethodGet,
- Interval: DefaultInterval,
- Timeout: DefaultTimeout,
- BodyType: "none",
- ResponseType: "code",
- InsecureSkipVerify: true,
- Headers: map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- HeadersMap: map[string]string{
- "Authorization": "Bearer {{.token}}",
- },
- OAuth: map[string]map[string]interface{}{
- "access": {
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "{{.expires}}",
- },
- },
- },
- accessConf: &AccessTokenConf{
- Url: "http://localhost:52345/token",
- Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
- Expire: "{{.expires}}",
- ExpireInSecond: 36000,
- },
- tokens: map[string]interface{}{
- "token": DefaultToken,
- "refresh_token": RefreshToken,
- "client_id": "test",
- "expires": float64(36000),
- },
- },
- {
- name: "oAuth with access token and refresh token",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- "refresh": map[string]interface{}{
- "url": "http://localhost:52345/refresh",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- "RefreshToken": "{{.refresh_token}}",
- },
- },
- },
- },
- config: &RawConf{
- Url: "http://localhost:52345/",
- Method: http.MethodGet,
- Interval: DefaultInterval,
- Timeout: DefaultTimeout,
- BodyType: "none",
- ResponseType: "code",
- InsecureSkipVerify: true,
- Headers: map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- HeadersMap: map[string]string{
- "Authorization": "Bearer {{.token}}",
- },
- OAuth: map[string]map[string]interface{}{
- "access": {
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- "refresh": {
- "url": "http://localhost:52345/refresh",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- "RefreshToken": "{{.refresh_token}}",
- },
- },
- },
- },
- accessConf: &AccessTokenConf{
- Url: "http://localhost:52345/token",
- Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
- Expire: "3600",
- ExpireInSecond: 3600,
- },
- refreshConf: &RefreshTokenConf{
- Url: "http://localhost:52345/refresh",
- Headers: map[string]string{
- "Authorization": "Bearer {{.token}}",
- "RefreshToken": "{{.refresh_token}}",
- },
- },
- tokens: map[string]interface{}{
- "token": DefaultToken,
- "refresh_token": RefreshToken,
- "client_id": "test",
- "expires": float64(36000),
- },
- },
- // Wrong auth configs
- {
- name: "oAuth wrong access token config",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": 3600,
- },
- },
- },
- err: errors.New("fail to parse the access properties of oAuth: 1 error(s) decoding:\n\n* 'expire' expected type 'string', got unconvertible type 'int', value: '3600'"),
- },
- {
- name: "oAuth wrong access url",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "",
- },
- },
- },
- config: &RawConf{
- Url: "http://localhost:52345/",
- Method: http.MethodGet,
- Interval: DefaultInterval,
- Timeout: DefaultTimeout,
- BodyType: "none",
- ResponseType: "code",
- InsecureSkipVerify: true,
- Headers: map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- HeadersMap: map[string]string{
- "Authorization": "Bearer {{.token}}",
- },
- },
- },
- {
- name: "oAuth miss access",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "refresh": map[string]interface{}{
- "url": "http://localhost:52345/",
- },
- },
- },
- err: errors.New("if setting oAuth, `access` property is required"),
- },
- {
- name: "oAuth wrong refresh token config",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- "refresh": map[string]interface{}{
- "url": 1202,
- },
- },
- },
- accessConf: &AccessTokenConf{
- Url: "http://localhost:52345/token",
- Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
- Expire: "3600",
- ExpireInSecond: 3600,
- },
- err: errors.New("fail to parse the refresh token properties: 1 error(s) decoding:\n\n* 'url' expected type 'string', got unconvertible type 'int', value: '1202'"),
- },
- {
- name: "oAuth refresh token missing url",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- },
- },
- config: &RawConf{
- Url: "http://localhost:52345/",
- Method: http.MethodGet,
- Interval: DefaultInterval,
- Timeout: DefaultTimeout,
- BodyType: "none",
- ResponseType: "code",
- InsecureSkipVerify: true,
- Headers: map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- HeadersMap: map[string]string{
- "Authorization": "Bearer {{.token}}",
- },
- OAuth: map[string]map[string]interface{}{
- "access": {
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- },
- },
- accessConf: &AccessTokenConf{
- Url: "http://localhost:52345/token",
- Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
- Expire: "3600",
- ExpireInSecond: 3600,
- },
- tokens: map[string]interface{}{
- "token": DefaultToken,
- "refresh_token": RefreshToken,
- "client_id": "test",
- "expires": float64(36000),
- },
- },
- // oAuth authentication flow errors
- {
- name: "oAuth auth error",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"1234\"}",
- "expire": "3600",
- },
- },
- },
- config: &RawConf{
- Url: "http://localhost:52345/",
- Method: http.MethodGet,
- Interval: DefaultInterval,
- Timeout: DefaultTimeout,
- BodyType: "json",
- ResponseType: "code",
- InsecureSkipVerify: true,
- Headers: map[string]string{
- "Authorization": "Bearer {{.token}}",
- },
- OAuth: map[string]map[string]interface{}{
- "access": {
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- },
- },
- accessConf: &AccessTokenConf{
- Url: "http://localhost:52345/token",
- Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
- Expire: "3600",
- ExpireInSecond: 3600,
- },
- tokens: map[string]interface{}{
- "token": DefaultToken,
- "refresh_token": RefreshToken,
- "client_id": "test",
- "expires": float64(36000),
- },
- err: errors.New("fail to authorize by oAuth: Cannot parse access token response to json: http return code error: 400"),
- },
- {
- name: "oAuth refresh error",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- "refresh": map[string]interface{}{
- "url": "http://localhost:52345/refresh",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- "RefreshToken": "{{.token}}",
- },
- },
- },
- },
- err: errors.New("fail to authorize by oAuth: Cannot parse refresh token response to json: http return code error: 400"),
- },
- {
- name: "oAuth wrong access expire template",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "{{..expp}}",
- },
- },
- },
- err: errors.New("fail to authorize by oAuth: fail to parse the expire time for access token: template: sink:1: unexpected . after term \".\""),
- },
- {
- name: "oAuth wrong access expire type",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "{{.token}}",
- },
- },
- },
- err: errors.New("fail to authorize by oAuth: fail to covert the expire time privatisation for access token: cannot convert string(privatisation) to int"),
- },
- {
- name: "oAuth wrong access url",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http:localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "{{.token}}",
- },
- },
- },
- err: errors.New("fail to authorize by oAuth: fail to get access token: Post \"http:localhost:52345/token\": http: no Host in request URL"),
- },
- {
- name: "oAuth wrong refresh header template",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- "refresh": map[string]interface{}{
- "url": "http://localhost:52345/refresh",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- "RefreshToken": "{{..token}}",
- },
- },
- },
- },
- err: errors.New("fail to authorize by oAuth: fail to parse the header for refresh token request RefreshToken: template: sink:1: unexpected . after term \".\""),
- },
- {
- name: "oAuth wrong refresh url",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- "refresh": map[string]interface{}{
- "url": "http:localhost:52345/refresh2",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- "RefreshToken": "{{.token}}",
- },
- },
- },
- },
- err: errors.New("fail to authorize by oAuth: fail to get refresh token: Post \"http:localhost:52345/refresh2\": http: no Host in request URL"),
- },
- }
- server := mockAuthServer()
- server.Start()
- defer server.Close()
- fmt.Printf("The test bucket size is %d.\n\n", len(tests))
- for i, tt := range tests {
- t.Run(fmt.Sprintf("Test %d: %s", i, tt.name), func(t *testing.T) {
- r := &PullSource{}
- err := r.Configure("", tt.props)
- if err != nil {
- if tt.err == nil {
- t.Errorf("Expected error: %v", err)
- } else {
- if err.Error() != tt.err.Error() {
- t.Errorf("Error mismatch\nexp\t%v\ngot\t%v", tt.err, err)
- }
- }
- return
- }
- if !reflect.DeepEqual(r.config, tt.config) {
- t.Errorf("Config mismatch\nexp\t%+v\ngot\t%+v", tt.config, r.config)
- }
- if !reflect.DeepEqual(r.accessConf, tt.accessConf) {
- t.Errorf("AccessConf mismatch\nexp\t%+v\ngot\t%+v", tt.accessConf, r.accessConf)
- }
- if !reflect.DeepEqual(r.refreshConf, tt.refreshConf) {
- t.Errorf("RefreshConf mismatch\nexp\t%+v\ngot\t%+v", tt.refreshConf, r.refreshConf)
- }
- if !reflect.DeepEqual(r.tokens, tt.tokens) {
- t.Errorf("Tokens mismatch\nexp\t%s\ngot\t%s", tt.tokens, r.tokens)
- }
- })
- }
- }
- func TestPullWithAuth(t *testing.T) {
- conf.IsTesting = false
- conf.InitClock()
- r := &PullSource{}
- server := mockAuthServer()
- server.Start()
- defer server.Close()
- err := r.Configure("data", map[string]interface{}{
- "url": "http://localhost:52345/",
- "interval": 100,
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "10",
- },
- "refresh": map[string]interface{}{
- "url": "http://localhost:52345/refresh",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- "RefreshToken": "{{.refresh_token}}",
- },
- },
- },
- })
- if err != nil {
- t.Errorf(err.Error())
- return
- }
- mc := conf.Clock
- exp := []api.SourceTuple{
- api.NewDefaultSourceTupleWithTime(map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}, map[string]interface{}{}, mc.Now()),
- }
- mock.TestSourceOpen(r, exp, t)
- }
- func TestPullIncremental(t *testing.T) {
- conf.IsTesting = false
- conf.InitClock()
- r := &PullSource{}
- server := mockAuthServer()
- server.Start()
- defer server.Close()
- err := r.Configure("data2", map[string]interface{}{
- "url": "http://localhost:52345/",
- "interval": 100,
- "incremental": true,
- "responseType": "body",
- })
- if err != nil {
- t.Errorf(err.Error())
- return
- }
- mc := conf.Clock
- exp := []api.SourceTuple{
- api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device0", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
- api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
- api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device2", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
- }
- mock.TestSourceOpen(r, exp, t)
- }
- func TestPullJsonList(t *testing.T) {
- conf.IsTesting = false
- conf.InitClock()
- r := &PullSource{}
- server := mockAuthServer()
- server.Start()
- defer server.Close()
- err := r.Configure("data3", map[string]interface{}{
- "url": "http://localhost:52345/",
- "interval": 100,
- "responseType": "body",
- })
- if err != nil {
- t.Errorf(err.Error())
- return
- }
- mc := conf.Clock
- exp := []api.SourceTuple{
- api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
- api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d2", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
- }
- mock.TestSourceOpen(r, exp, t)
- }
- func TestPullUrlTimeRange(t *testing.T) {
- r := &PullSource{}
- server := mockAuthServer()
- server.Start()
- defer server.Close()
- err := r.Configure("", map[string]interface{}{
- "url": "http://localhost:52345/data4?device=d1&start={{.LastPullTime}}&end={{.PullTime}}",
- "interval": 110,
- "responseType": "body",
- })
- if err != nil {
- t.Errorf(err.Error())
- return
- }
- // Mock time
- mockclock.ResetClock(143)
- exp := []api.SourceTuple{
- api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": float64(53), "temperature": float64(43)}}, map[string]interface{}{}, time.UnixMilli(253)),
- api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": float64(63), "temperature": float64(3)}}, map[string]interface{}{}, time.UnixMilli(363)),
- }
- c := mockclock.GetMockClock()
- go func() {
- time.Sleep(10 * time.Millisecond)
- c.Add(350 * time.Millisecond)
- }()
- mock.TestSourceOpen(r, exp, t)
- }
- func TestPullBodyTimeRange(t *testing.T) {
- r := &PullSource{}
- server := mockAuthServer()
- server.Start()
- defer server.Close()
- err := r.Configure("data5", map[string]interface{}{
- "url": "http://localhost:52345/",
- "interval": 110,
- "responseType": "body",
- "method": "POST",
- "body": `{"device": "d1", "start": {{.LastPullTime}}, "end": {{.PullTime}}}`,
- })
- if err != nil {
- t.Errorf(err.Error())
- return
- }
- // Mock time
- mockclock.ResetClock(143)
- exp := []api.SourceTuple{
- api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": float64(53), "temperature": float64(43)}}, map[string]interface{}{}, time.UnixMilli(253)),
- api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": float64(63), "temperature": float64(3)}}, map[string]interface{}{}, time.UnixMilli(363)),
- }
- c := mockclock.GetMockClock()
- go func() {
- time.Sleep(10 * time.Millisecond)
- c.Add(350 * time.Millisecond)
- }()
- mock.TestSourceOpen(r, exp, t)
- }
- func TestPullErrorTest(t *testing.T) {
- conf.IsTesting = false
- conf.InitClock()
- tests := []struct {
- name string
- conf map[string]interface{}
- exp []api.SourceTuple
- }{
- {
- name: "wrong url template",
- conf: map[string]interface{}{"url": "http://localhost:52345/data4?device=d1&start={{.lastPullTime}}&end={{.PullTime}", "interval": 10},
- exp: []api.SourceTuple{
- &xsql.ErrorSourceTuple{
- Error: errors.New("parse url http://localhost:52345/data4?device=d1&start={{.lastPullTime}}&end={{.PullTime} error template: sink:1: bad character U+007D '}'"),
- },
- },
- }, {
- name: "wrong header template",
- conf: map[string]interface{}{"url": "http://localhost:52345/data4", "interval": 10, "HeadersTemplate": "\"Authorization\": \"Bearer {{.aatoken}}"},
- exp: []api.SourceTuple{
- &xsql.ErrorSourceTuple{
- Error: errors.New("parse headers error parsed header template is not json: \"Authorization\": \"Bearer <no value>"),
- },
- },
- }, {
- name: "wrong body template",
- conf: map[string]interface{}{"url": "http://localhost:52345/data4", "interval": 10, "body": `{"device": "d1", "start": {{.LastPullTime}}, "end": {{.pullTime}}}`},
- exp: []api.SourceTuple{
- &xsql.ErrorSourceTuple{
- Error: errors.New("parse body {\"device\": \"d1\", \"start\": {{.LastPullTime}}, \"end\": {{.pullTime}}} error template: sink:1:54: executing \"sink\" at <.pullTime>: can't evaluate field pullTime in type *http.pullTimeMeta"),
- },
- },
- }, {
- name: "wrong response",
- conf: map[string]interface{}{"url": "http://localhost:52345/aa/data4", "interval": 10},
- exp: []api.SourceTuple{
- &xsql.ErrorSourceTuple{
- Error: errors.New("parse response error http return code error: 404"),
- },
- },
- }, {
- name: "wrong request",
- conf: map[string]interface{}{"url": "http://localhost:52345/aa/data4", "interval": 10, "bodyType": "form", "body": "ddd"},
- exp: []api.SourceTuple{
- &xsql.ErrorSourceTuple{
- Error: errors.New("send request error invalid content: ddd"),
- },
- },
- },
- }
- server := mockAuthServer()
- server.Start()
- defer server.Close()
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- r := &PullSource{}
- err := r.Configure("", test.conf)
- assert.NoError(t, err)
- mock.TestSourceOpen(r, test.exp, t)
- })
- }
- }
|