123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787 |
- // Copyright 2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package http
- import (
- "encoding/json"
- "errors"
- "fmt"
- "github.com/gorilla/mux"
- "github.com/lf-edge/ekuiper/internal/io/mock"
- "github.com/lf-edge/ekuiper/pkg/api"
- "net"
- "net/http"
- "net/http/httptest"
- "path/filepath"
- "reflect"
- "strconv"
- "testing"
- )
- func jsonOut(w http.ResponseWriter, err error, out interface{}) {
- w.Header().Add("Content-Type", "application/json")
- enc := json.NewEncoder(w)
- err = enc.Encode(out)
- // Problems encoding
- if err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
- }
- }
- const (
- DefaultToken = "privatisation"
- RefreshToken = "privaterefresh"
- )
- // mock http auth server
- func mockAuthServer() *httptest.Server {
- l, _ := net.Listen("tcp", "127.0.0.1:52345")
- router := mux.NewRouter()
- i := 0
- router.HandleFunc("/token", func(w http.ResponseWriter, r *http.Request) {
- body := &struct {
- Username string `json:"username"`
- Password string `json:"password"`
- }{}
- err := json.NewDecoder(r.Body).Decode(body)
- if err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
- }
- if body.Username != "admin" || body.Password != "0000" {
- http.Error(w, "invalid username or password", http.StatusBadRequest)
- }
- out := &struct {
- Token string `json:"token"`
- RefreshToken string `json:"refresh_token"`
- ClientId string `json:"client_id"`
- Expires int64 `json:"expires"`
- }{
- Token: DefaultToken,
- RefreshToken: RefreshToken,
- ClientId: "test",
- Expires: 36000,
- }
- jsonOut(w, err, out)
- }).Methods(http.MethodPost)
- router.HandleFunc("/refresh", func(w http.ResponseWriter, r *http.Request) {
- token := r.Header.Get("Authorization")
- if token != "Bearer "+DefaultToken {
- http.Error(w, "invalid token", http.StatusBadRequest)
- }
- rt := r.Header.Get("RefreshToken")
- if rt != RefreshToken {
- http.Error(w, "invalid refresh token", http.StatusBadRequest)
- }
- out := &struct {
- Token string `json:"token"`
- RefreshToken string `json:"refresh_token"`
- ClientId string `json:"client_id"`
- Expires int64 `json:"expires"`
- }{
- Token: DefaultToken,
- RefreshToken: RefreshToken,
- ClientId: "test",
- Expires: 36000,
- }
- jsonOut(w, nil, out)
- }).Methods(http.MethodPost)
- router.HandleFunc("/data", func(w http.ResponseWriter, r *http.Request) {
- token := r.Header.Get("Authorization")
- if token != "Bearer "+DefaultToken {
- http.Error(w, "invalid token", http.StatusBadRequest)
- }
- out := &struct {
- DeviceId string `json:"device_id"`
- Temperature float64 `json:"temperature"`
- Humidity float64 `json:"humidity"`
- }{
- DeviceId: "device1",
- Temperature: 25.5,
- Humidity: 60.0,
- }
- jsonOut(w, nil, out)
- }).Methods(http.MethodGet)
- // Return same data for 3 times
- router.HandleFunc("/data2", func(w http.ResponseWriter, r *http.Request) {
- out := &struct {
- Code int `json:"code"`
- Data struct {
- DeviceId string `json:"device_id"`
- Temperature float64 `json:"temperature"`
- Humidity float64 `json:"humidity"`
- } `json:"data"`
- }{
- Code: 200,
- Data: struct {
- DeviceId string `json:"device_id"`
- Temperature float64 `json:"temperature"`
- Humidity float64 `json:"humidity"`
- }{
- DeviceId: "device" + strconv.Itoa(i/3),
- Temperature: 25.5,
- Humidity: 60.0,
- },
- }
- i++
- jsonOut(w, nil, out)
- }).Methods(http.MethodGet)
- server := httptest.NewUnstartedServer(router)
- server.Listener.Close()
- server.Listener = l
- return server
- }
- var wrongPath, _ = filepath.Abs("/tmp/wrong")
- // Test configure to properties
- func TestConfigure(t *testing.T) {
- tests := []struct {
- name string
- props map[string]interface{}
- err error
- config *RawConf
- accessConf *AccessTokenConf
- refreshConf *RefreshTokenConf
- tokens map[string]interface{}
- }{
- {
- name: "default",
- props: map[string]interface{}{
- "incremental": true,
- "url": "http://localhost:9090/",
- },
- config: &RawConf{
- Incremental: true,
- Url: "http://localhost:9090/",
- Method: http.MethodGet,
- Interval: DefaultInterval,
- Timeout: DefaultTimeout,
- BodyType: "none",
- ResponseType: "code",
- InsecureSkipVerify: true,
- },
- },
- // Test wrong properties
- {
- name: "wrong props",
- props: map[string]interface{}{
- "incremental": true,
- "url": 123,
- },
- err: fmt.Errorf("fail to parse the properties: 1 error(s) decoding:\n\n* 'url' expected type 'string', got unconvertible type 'int', value: '123'"),
- },
- {
- name: "empty url",
- props: map[string]interface{}{
- "incremental": true,
- "url": "",
- },
- err: fmt.Errorf("url is required"),
- },
- {
- name: "wrong method",
- props: map[string]interface{}{
- "url": "http://localhost:9090/",
- "method": "wrong",
- },
- err: fmt.Errorf("Not supported HTTP method wrong."),
- },
- {
- name: "wrong bodytype",
- props: map[string]interface{}{
- "url": "http://localhost:9090/",
- "bodyType": "wrong",
- },
- err: fmt.Errorf("Not valid body type value wrong."),
- },
- {
- name: "wrong response type",
- props: map[string]interface{}{
- "url": "http://localhost:9090/",
- "responseType": "wrong",
- },
- err: fmt.Errorf("Not valid response type value wrong."),
- },
- {
- name: "wrong url",
- props: map[string]interface{}{
- "url": "http:/localhost:9090/",
- },
- err: fmt.Errorf("Invalid url, host not found"),
- },
- {
- name: "wrong interval",
- props: map[string]interface{}{
- "url": "http:/localhost:9090/",
- "interval": -2,
- },
- err: fmt.Errorf("interval must be greater than 0"),
- },
- {
- name: "wrong timeout",
- props: map[string]interface{}{
- "url": "http:/localhost:9090/",
- "timeout": -2,
- },
- err: fmt.Errorf("timeout must be greater than or equal to 0"),
- },
- {
- name: "wrong tls",
- props: map[string]interface{}{
- "url": "http://localhost:9090/",
- "certificationPath": wrongPath,
- },
- err: fmt.Errorf(fmt.Sprintf("stat %s: no such file or directory", wrongPath)),
- },
- // Test oAuth
- {
- name: "oAuth with access token and constant expire",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- },
- },
- config: &RawConf{
- Url: "http://localhost:52345/",
- Method: http.MethodGet,
- Interval: DefaultInterval,
- Timeout: DefaultTimeout,
- BodyType: "none",
- ResponseType: "code",
- InsecureSkipVerify: true,
- Headers: map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- HeadersMap: map[string]string{
- "Authorization": "Bearer {{.token}}",
- },
- OAuth: map[string]map[string]interface{}{
- "access": {
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- },
- },
- accessConf: &AccessTokenConf{
- Url: "http://localhost:52345/token",
- Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
- Expire: "3600",
- ExpireInSecond: 3600,
- },
- tokens: map[string]interface{}{
- "token": DefaultToken,
- "refresh_token": RefreshToken,
- "client_id": "test",
- "expires": float64(36000),
- },
- },
- {
- name: "oAuth with access token and dynamic expire",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "{{.expires}}",
- },
- },
- },
- config: &RawConf{
- Url: "http://localhost:52345/",
- Method: http.MethodGet,
- Interval: DefaultInterval,
- Timeout: DefaultTimeout,
- BodyType: "none",
- ResponseType: "code",
- InsecureSkipVerify: true,
- Headers: map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- HeadersMap: map[string]string{
- "Authorization": "Bearer {{.token}}",
- },
- OAuth: map[string]map[string]interface{}{
- "access": {
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "{{.expires}}",
- },
- },
- },
- accessConf: &AccessTokenConf{
- Url: "http://localhost:52345/token",
- Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
- Expire: "{{.expires}}",
- ExpireInSecond: 36000,
- },
- tokens: map[string]interface{}{
- "token": DefaultToken,
- "refresh_token": RefreshToken,
- "client_id": "test",
- "expires": float64(36000),
- },
- },
- {
- name: "oAuth with access token and refresh token",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- "refresh": map[string]interface{}{
- "url": "http://localhost:52345/refresh",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- "RefreshToken": "{{.refresh_token}}",
- },
- },
- },
- },
- config: &RawConf{
- Url: "http://localhost:52345/",
- Method: http.MethodGet,
- Interval: DefaultInterval,
- Timeout: DefaultTimeout,
- BodyType: "none",
- ResponseType: "code",
- InsecureSkipVerify: true,
- Headers: map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- HeadersMap: map[string]string{
- "Authorization": "Bearer {{.token}}",
- },
- OAuth: map[string]map[string]interface{}{
- "access": {
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- "refresh": {
- "url": "http://localhost:52345/refresh",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- "RefreshToken": "{{.refresh_token}}",
- },
- },
- },
- },
- accessConf: &AccessTokenConf{
- Url: "http://localhost:52345/token",
- Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
- Expire: "3600",
- ExpireInSecond: 3600,
- },
- refreshConf: &RefreshTokenConf{
- Url: "http://localhost:52345/refresh",
- Headers: map[string]string{
- "Authorization": "Bearer {{.token}}",
- "RefreshToken": "{{.refresh_token}}",
- },
- },
- tokens: map[string]interface{}{
- "token": DefaultToken,
- "refresh_token": RefreshToken,
- "client_id": "test",
- "expires": float64(36000),
- },
- },
- // Wrong auth configs
- {
- name: "oAuth wrong access token config",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": 3600,
- },
- },
- },
- err: errors.New("fail to parse the access properties of oAuth: 1 error(s) decoding:\n\n* 'expire' expected type 'string', got unconvertible type 'int', value: '3600'"),
- },
- {
- name: "oAuth wrong access url",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "",
- },
- },
- },
- err: errors.New("access token url is required"),
- },
- {
- name: "oAuth miss access",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "refresh": map[string]interface{}{
- "url": "http://localhost:52345/",
- },
- },
- },
- err: errors.New("if setting oAuth, `access` property is required"),
- },
- {
- name: "oAuth wrong refresh token config",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- "refresh": map[string]interface{}{
- "url": 1202,
- },
- },
- },
- accessConf: &AccessTokenConf{
- Url: "http://localhost:52345/token",
- Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
- Expire: "3600",
- ExpireInSecond: 3600,
- },
- err: errors.New("fail to parse the refresh token properties: 1 error(s) decoding:\n\n* 'url' expected type 'string', got unconvertible type 'int', value: '1202'"),
- },
- {
- name: "oAuth refresh token missing url",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- "refresh": map[string]interface{}{
- "url": "",
- },
- },
- },
- accessConf: &AccessTokenConf{
- Url: "http://localhost:52345/token",
- Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
- Expire: "3600",
- ExpireInSecond: 3600,
- },
- err: errors.New("refresh token url is required"),
- },
- // oAuth authentication flow errors
- {
- name: "oAuth auth error",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"1234\"}",
- "expire": "3600",
- },
- },
- },
- config: &RawConf{
- Url: "http://localhost:52345/",
- Method: http.MethodGet,
- Interval: DefaultInterval,
- Timeout: DefaultTimeout,
- BodyType: "json",
- ResponseType: "code",
- InsecureSkipVerify: true,
- Headers: map[string]string{
- "Authorization": "Bearer {{.token}}",
- },
- OAuth: map[string]map[string]interface{}{
- "access": {
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- },
- },
- accessConf: &AccessTokenConf{
- Url: "http://localhost:52345/token",
- Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
- Expire: "3600",
- ExpireInSecond: 3600,
- },
- tokens: map[string]interface{}{
- "token": DefaultToken,
- "refresh_token": RefreshToken,
- "client_id": "test",
- "expires": float64(36000),
- },
- err: errors.New("fail to authorize by oAuth: Cannot parse access token response to json: http return code error: 400"),
- },
- {
- name: "oAuth refresh error",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- "refresh": map[string]interface{}{
- "url": "http://localhost:52345/refresh",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- "RefreshToken": "{{.token}}",
- },
- },
- },
- },
- err: errors.New("fail to authorize by oAuth: Cannot parse refresh token response to json: http return code error: 400"),
- },
- {
- name: "oAuth wrong access expire template",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "{{..expp}}",
- },
- },
- },
- err: errors.New("fail to authorize by oAuth: fail to parse the expire time for access token: template: sink:1: unexpected . after term \".\""),
- },
- {
- name: "oAuth wrong access expire type",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "{{.token}}",
- },
- },
- },
- err: errors.New("fail to authorize by oAuth: fail to covert the expire time privatisation for access token: cannot convert string(privatisation) to int"),
- },
- {
- name: "oAuth wrong access url",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http:localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "{{.token}}",
- },
- },
- },
- err: errors.New("fail to authorize by oAuth: fail to get access token: Post \"http:localhost:52345/token\": http: no Host in request URL"),
- },
- {
- name: "oAuth wrong refresh header template",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- "refresh": map[string]interface{}{
- "url": "http://localhost:52345/refresh",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- "RefreshToken": "{{..token}}",
- },
- },
- },
- },
- err: errors.New("fail to authorize by oAuth: fail to parse the header for refresh token request RefreshToken: template: sink:1: unexpected . after term \".\""),
- },
- {
- name: "oAuth wrong refresh url",
- props: map[string]interface{}{
- "url": "http://localhost:52345/",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "3600",
- },
- "refresh": map[string]interface{}{
- "url": "http:localhost:52345/refresh2",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- "RefreshToken": "{{.token}}",
- },
- },
- },
- },
- err: errors.New("fail to authorize by oAuth: fail to get refresh token: Post \"http:localhost:52345/refresh2\": http: no Host in request URL"),
- },
- }
- server := mockAuthServer()
- server.Start()
- defer server.Close()
- fmt.Printf("The test bucket size is %d.\n\n", len(tests))
- for i, tt := range tests {
- t.Run(fmt.Sprintf("Test %d: %s", i, tt.name), func(t *testing.T) {
- r := &PullSource{}
- err := r.Configure("", tt.props)
- if err != nil {
- if tt.err == nil {
- t.Errorf("Expected error: %v", err)
- } else {
- if err.Error() != tt.err.Error() {
- t.Errorf("Error mismatch\nexp\t%v\ngot\t%v", tt.err, err)
- }
- }
- return
- }
- if !reflect.DeepEqual(r.config, tt.config) {
- t.Errorf("Config mismatch\nexp\t%+v\ngot\t%+v", tt.config, r.config)
- }
- if !reflect.DeepEqual(r.accessConf, tt.accessConf) {
- t.Errorf("AccessConf mismatch\nexp\t%+v\ngot\t%+v", tt.accessConf, r.accessConf)
- }
- if !reflect.DeepEqual(r.refreshConf, tt.refreshConf) {
- t.Errorf("RefreshConf mismatch\nexp\t%+v\ngot\t%+v", tt.refreshConf, r.refreshConf)
- }
- if !reflect.DeepEqual(r.tokens, tt.tokens) {
- t.Errorf("Tokens mismatch\nexp\t%s\ngot\t%s", tt.tokens, r.tokens)
- }
- })
- }
- }
- func TestPullWithAuth(t *testing.T) {
- r := &PullSource{}
- server := mockAuthServer()
- server.Start()
- defer server.Close()
- err := r.Configure("data", map[string]interface{}{
- "url": "http://localhost:52345/",
- "interval": 100,
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- },
- "oAuth": map[string]interface{}{
- "access": map[string]interface{}{
- "url": "http://localhost:52345/token",
- "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
- "expire": "10",
- },
- "refresh": map[string]interface{}{
- "url": "http://localhost:52345/refresh",
- "headers": map[string]interface{}{
- "Authorization": "Bearer {{.token}}",
- "RefreshToken": "{{.refresh_token}}",
- },
- },
- },
- })
- if err != nil {
- t.Errorf(err.Error())
- return
- }
- exp := []api.SourceTuple{
- api.NewDefaultSourceTuple(map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}, map[string]interface{}{}),
- }
- mock.TestSourceOpen(r, exp, t)
- }
- func TestPullIncremental(t *testing.T) {
- r := &PullSource{}
- server := mockAuthServer()
- server.Start()
- defer server.Close()
- err := r.Configure("data2", map[string]interface{}{
- "url": "http://localhost:52345/",
- "interval": 100,
- "incremental": true,
- "responseType": "body",
- })
- if err != nil {
- t.Errorf(err.Error())
- return
- }
- exp := []api.SourceTuple{
- api.NewDefaultSourceTuple(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device0", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}),
- api.NewDefaultSourceTuple(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}),
- api.NewDefaultSourceTuple(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device2", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}),
- }
- mock.TestSourceOpen(r, exp, t)
- }
|