Ver código fonte

feat: support httppull lookup (#2215)

Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao 1 ano atrás
pai
commit
bf3db426a8

+ 2 - 1
internal/binder/io/builtin.go

@@ -51,7 +51,8 @@ var (
 		"file":        func() api.Sink { return file.File() },
 	}
 	lookupSources = map[string]NewLookupSourceFunc{
-		"memory": func() api.LookupSource { return memory.GetLookupSource() },
+		"memory":   func() api.LookupSource { return memory.GetLookupSource() },
+		"httppull": func() api.LookupSource { return http.GetLookUpSource() },
 	}
 )
 

+ 28 - 0
internal/binder/io/builtin_test.go

@@ -0,0 +1,28 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package io
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/require"
+)
+
+func TestLookupSources(t *testing.T) {
+	_, ok := lookupSources["memory"]
+	require.True(t, ok)
+	_, ok = lookupSources["httppull"]
+	require.True(t, ok)
+}

+ 18 - 2
internal/io/http/client.go

@@ -94,7 +94,23 @@ type bodyResp struct {
 
 var bodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": ""}
 
-func (cc *ClientConf) InitConf(device string, props map[string]interface{}) error {
+type ClientConfOption struct {
+	checkInterval bool
+}
+
+type WithClientConfOption func(clientConf *ClientConfOption)
+
+func WithCheckInterval(checkInterval bool) WithClientConfOption {
+	return func(clientConf *ClientConfOption) {
+		clientConf.checkInterval = checkInterval
+	}
+}
+
+func (cc *ClientConf) InitConf(device string, props map[string]interface{}, withOptions ...WithClientConfOption) error {
+	option := &ClientConfOption{}
+	for _, withOption := range withOptions {
+		withOption(option)
+	}
 	c := &RawConf{
 		Url:                "http://localhost",
 		Method:             http.MethodGet,
@@ -116,7 +132,7 @@ func (cc *ClientConf) InitConf(device string, props map[string]interface{}) erro
 	default:
 		return fmt.Errorf("Not supported HTTP method %s.", c.Method)
 	}
-	if c.Interval <= 0 {
+	if option.checkInterval && c.Interval <= 0 {
 		return fmt.Errorf("interval must be greater than 0")
 	}
 	if c.Timeout < 0 {

+ 1 - 1
internal/io/http/client_test.go

@@ -90,7 +90,7 @@ func TestHeaderConf(t *testing.T) {
 	for i, tt := range tests {
 		t.Run(fmt.Sprintf("Test %d: %s", i, tt.name), func(t *testing.T) {
 			r := &ClientConf{}
-			err := r.InitConf("", tt.props)
+			err := r.InitConf("", tt.props, WithCheckInterval(true))
 			if err != nil {
 				t.Errorf("Unexpected error: %v", err)
 				return

+ 108 - 0
internal/io/http/httppull_lookup.go

@@ -0,0 +1,108 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package http
+
+import (
+	"time"
+
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/pkg/httpx"
+	"github.com/lf-edge/ekuiper/pkg/api"
+)
+
+func GetLookUpSource() *lookupSource {
+	return &lookupSource{}
+}
+
+type lookupSource struct {
+	*ClientConf
+}
+
+func (l *lookupSource) Open(ctx api.StreamContext) error {
+	ctx.GetLogger().Infof("lookup source is opened")
+	return nil
+}
+
+func (l *lookupSource) Configure(datasource string, props map[string]interface{}) error {
+	conf.Log.Infof("Initialized Httppull lookup table with configurations %#v.", props)
+	if l.ClientConf == nil {
+		l.ClientConf = &ClientConf{}
+	}
+	return l.InitConf(datasource, props)
+}
+
+func (l *lookupSource) Lookup(ctx api.StreamContext, _ []string, keys []string, values []interface{}) ([]api.SourceTuple, error) {
+	resps, err := l.pull(ctx)
+	if err != nil {
+		return nil, err
+	}
+	matched := l.lookupJoin(resps, keys, values)
+	var results []api.SourceTuple
+	meta := make(map[string]interface{})
+	for _, resp := range matched {
+		results = append(results, api.NewDefaultSourceTupleWithTime(resp, meta, conf.GetNow()))
+	}
+	return results, nil
+}
+
+func (l *lookupSource) Close(ctx api.StreamContext) error {
+	logger := ctx.GetLogger()
+	logger.Infof("Closing HTTP pull lookup table")
+	return nil
+}
+
+func (l *lookupSource) lookupJoin(dataMap []map[string]interface{}, keys []string, values []interface{}) []map[string]interface{} {
+	var resps []map[string]interface{}
+	for _, resp := range dataMap {
+		match := true
+		for i, k := range keys {
+			if val, ok := resp[k]; !ok || val != values[i] {
+				match = false
+				break
+			}
+		}
+		if match {
+			resps = append(resps, resp)
+		}
+	}
+	return resps
+}
+
+func (l *lookupSource) pull(ctx api.StreamContext) ([]map[string]interface{}, error) {
+	// check oAuth token expiration
+	if l.accessConf != nil && l.accessConf.ExpireInSecond > 0 &&
+		int(time.Now().Sub(l.tokenLastUpdateAt).Abs().Seconds()) >= l.accessConf.ExpireInSecond {
+		ctx.GetLogger().Debugf("Refreshing token for HTTP pull")
+		if err := l.refresh(ctx); err != nil {
+			ctx.GetLogger().Warnf("Refresh HTTP pull token error: %v", err)
+		}
+	}
+	headers, err := l.parseHeaders(ctx, l.tokens)
+	if err != nil {
+		return nil, err
+	}
+	ctx.GetLogger().Debugf("httppull source sending request url: %s, headers: %v, body %s", l.config.Url, headers, l.config.Body)
+	resp, e := httpx.Send(ctx.GetLogger(), l.client, l.config.BodyType, l.config.Method, l.config.Url, headers, true, l.config.Body)
+	if e != nil {
+		ctx.GetLogger().Warnf("Found error %s when trying to reach %v ", e, l)
+		return nil, err
+	}
+	ctx.GetLogger().Debugf("httppull source got response %v", resp)
+	results, _, e := l.parseResponse(ctx, resp, true, nil)
+	if e != nil {
+		return nil, err
+	}
+	return results, nil
+}

+ 349 - 0
internal/io/http/httppull_lookup_test.go

@@ -0,0 +1,349 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package http
+
+import (
+	"fmt"
+	"net/http"
+	"reflect"
+	"testing"
+
+	"github.com/stretchr/testify/require"
+
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/topo/context"
+)
+
+func TestConfigureLookup(t *testing.T) {
+	tests := []struct {
+		name        string
+		props       map[string]interface{}
+		err         error
+		config      *RawConf
+		accessConf  *AccessTokenConf
+		refreshConf *RefreshTokenConf
+		tokens      map[string]interface{}
+	}{
+		// Test oAuth
+		{
+			name: "oAuth with access token and constant expire",
+			props: map[string]interface{}{
+				"url": "http://localhost:52345/",
+				"headers": map[string]interface{}{
+					"Authorization": "Bearer {{.token}}",
+				},
+				"oAuth": map[string]interface{}{
+					"access": map[string]interface{}{
+						"url":    "http://localhost:52345/token",
+						"body":   "{\"username\": \"admin\",\"password\": \"0000\"}",
+						"expire": "3600",
+					},
+				},
+			},
+			config: &RawConf{
+				Url:                "http://localhost:52345/",
+				ResendUrl:          "http://localhost:52345/",
+				Method:             http.MethodGet,
+				Interval:           DefaultInterval,
+				Timeout:            DefaultTimeout,
+				BodyType:           "none",
+				ResponseType:       "code",
+				InsecureSkipVerify: true,
+				Headers: map[string]interface{}{
+					"Authorization": "Bearer {{.token}}",
+				},
+				HeadersMap: map[string]string{
+					"Authorization": "Bearer {{.token}}",
+				},
+				OAuth: map[string]map[string]interface{}{
+					"access": {
+						"url":    "http://localhost:52345/token",
+						"body":   "{\"username\": \"admin\",\"password\": \"0000\"}",
+						"expire": "3600",
+					},
+				},
+			},
+			accessConf: &AccessTokenConf{
+				Url:            "http://localhost:52345/token",
+				Body:           "{\"username\": \"admin\",\"password\": \"0000\"}",
+				Expire:         "3600",
+				ExpireInSecond: 3600,
+			},
+			tokens: map[string]interface{}{
+				"token":         DefaultToken,
+				"refresh_token": RefreshToken,
+				"client_id":     "test",
+				"expires":       float64(36000),
+			},
+		},
+		{
+			name: "oAuth with access token and dynamic expire",
+			props: map[string]interface{}{
+				"url": "http://localhost:52345/",
+				"headers": map[string]interface{}{
+					"Authorization": "Bearer {{.token}}",
+				},
+				"oAuth": map[string]interface{}{
+					"access": map[string]interface{}{
+						"url":    "http://localhost:52345/token",
+						"body":   "{\"username\": \"admin\",\"password\": \"0000\"}",
+						"expire": "{{.expires}}",
+					},
+				},
+			},
+			config: &RawConf{
+				Url:                "http://localhost:52345/",
+				ResendUrl:          "http://localhost:52345/",
+				Method:             http.MethodGet,
+				Interval:           DefaultInterval,
+				Timeout:            DefaultTimeout,
+				BodyType:           "none",
+				ResponseType:       "code",
+				InsecureSkipVerify: true,
+				Headers: map[string]interface{}{
+					"Authorization": "Bearer {{.token}}",
+				},
+				HeadersMap: map[string]string{
+					"Authorization": "Bearer {{.token}}",
+				},
+				OAuth: map[string]map[string]interface{}{
+					"access": {
+						"url":    "http://localhost:52345/token",
+						"body":   "{\"username\": \"admin\",\"password\": \"0000\"}",
+						"expire": "{{.expires}}",
+					},
+				},
+			},
+			accessConf: &AccessTokenConf{
+				Url:            "http://localhost:52345/token",
+				Body:           "{\"username\": \"admin\",\"password\": \"0000\"}",
+				Expire:         "{{.expires}}",
+				ExpireInSecond: 36000,
+			},
+			tokens: map[string]interface{}{
+				"token":         DefaultToken,
+				"refresh_token": RefreshToken,
+				"client_id":     "test",
+				"expires":       float64(36000),
+			},
+		},
+		{
+			name: "oAuth with access token and refresh token",
+			props: map[string]interface{}{
+				"url": "http://localhost:52345/",
+				"headers": map[string]interface{}{
+					"Authorization": "Bearer {{.token}}",
+				},
+				"oAuth": map[string]interface{}{
+					"access": map[string]interface{}{
+						"url":    "http://localhost:52345/token",
+						"body":   "{\"username\": \"admin\",\"password\": \"0000\"}",
+						"expire": "3600",
+					},
+					"refresh": map[string]interface{}{
+						"url": "http://localhost:52345/refresh",
+						"headers": map[string]interface{}{
+							"Authorization": "Bearer {{.token}}",
+							"RefreshToken":  "{{.refresh_token}}",
+						},
+					},
+				},
+			},
+			config: &RawConf{
+				Url:                "http://localhost:52345/",
+				ResendUrl:          "http://localhost:52345/",
+				Method:             http.MethodGet,
+				Interval:           DefaultInterval,
+				Timeout:            DefaultTimeout,
+				BodyType:           "none",
+				ResponseType:       "code",
+				InsecureSkipVerify: true,
+				Headers: map[string]interface{}{
+					"Authorization": "Bearer {{.token}}",
+				},
+				HeadersMap: map[string]string{
+					"Authorization": "Bearer {{.token}}",
+				},
+				OAuth: map[string]map[string]interface{}{
+					"access": {
+						"url":    "http://localhost:52345/token",
+						"body":   "{\"username\": \"admin\",\"password\": \"0000\"}",
+						"expire": "3600",
+					},
+					"refresh": {
+						"url": "http://localhost:52345/refresh",
+						"headers": map[string]interface{}{
+							"Authorization": "Bearer {{.token}}",
+							"RefreshToken":  "{{.refresh_token}}",
+						},
+					},
+				},
+			},
+			accessConf: &AccessTokenConf{
+				Url:            "http://localhost:52345/token",
+				Body:           "{\"username\": \"admin\",\"password\": \"0000\"}",
+				Expire:         "3600",
+				ExpireInSecond: 3600,
+			},
+			refreshConf: &RefreshTokenConf{
+				Url: "http://localhost:52345/refresh",
+				Headers: map[string]string{
+					"Authorization": "Bearer {{.token}}",
+					"RefreshToken":  "{{.refresh_token}}",
+				},
+			},
+			tokens: map[string]interface{}{
+				"token":         DefaultToken,
+				"refresh_token": RefreshToken,
+				"client_id":     "test",
+				"expires":       float64(36000),
+			},
+		},
+		// test default
+		{
+			name: "default",
+			props: map[string]interface{}{
+				"url": "http://localhost:9090/",
+			},
+			config: &RawConf{
+				Url:                "http://localhost:9090/",
+				ResendUrl:          "http://localhost:9090/",
+				Method:             http.MethodGet,
+				Interval:           DefaultInterval,
+				Timeout:            DefaultTimeout,
+				BodyType:           "none",
+				ResponseType:       "code",
+				InsecureSkipVerify: true,
+			},
+		},
+	}
+	server := mockAuthServer()
+	server.Start()
+
+	defer server.Close()
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	for i, tt := range tests {
+		t.Run(fmt.Sprintf("Test %d: %s", i, tt.name), func(t *testing.T) {
+			r := &lookupSource{}
+			err := r.Configure("", tt.props)
+			if err != nil {
+				if tt.err == nil {
+					t.Errorf("Expected error: %v", err)
+				} else {
+					if err.Error() != tt.err.Error() {
+						t.Errorf("Error mismatch\nexp\t%v\ngot\t%v", tt.err, err)
+					}
+				}
+				return
+			}
+			if !reflect.DeepEqual(r.config, tt.config) {
+				t.Errorf("Config mismatch\nexp\t%+v\ngot\t%+v", tt.config, r.config)
+			}
+			if !reflect.DeepEqual(r.accessConf, tt.accessConf) {
+				t.Errorf("AccessConf mismatch\nexp\t%+v\ngot\t%+v", tt.accessConf, r.accessConf)
+			}
+			if !reflect.DeepEqual(r.refreshConf, tt.refreshConf) {
+				t.Errorf("RefreshConf mismatch\nexp\t%+v\ngot\t%+v", tt.refreshConf, r.refreshConf)
+			}
+			if !reflect.DeepEqual(r.tokens, tt.tokens) {
+				t.Errorf("Tokens mismatch\nexp\t%s\ngot\t%s", tt.tokens, r.tokens)
+			}
+		})
+	}
+}
+
+func TestLookupPull(t *testing.T) {
+	conf.IsTesting = false
+	conf.InitClock()
+	r := &lookupSource{}
+	server := mockAuthServer()
+	server.Start()
+	defer server.Close()
+	err := r.Configure("data3", map[string]interface{}{
+		"url":          "http://localhost:52345/",
+		"responseType": "body",
+	})
+	require.NoError(t, err)
+	resp, err := r.pull(context.Background())
+	require.NoError(t, err)
+	require.Equal(t, []map[string]interface{}{
+		{
+			"code": float64(200),
+			"data": map[string]interface{}{
+				"device_id":   "d1",
+				"temperature": float64(25.5),
+				"humidity":    float64(60),
+			},
+		},
+		{
+			"code": float64(200),
+			"data": map[string]interface{}{
+				"device_id":   "d2",
+				"temperature": float64(25.5),
+				"humidity":    float64(60),
+			},
+		},
+	}, resp)
+}
+
+func TestLookupJoin(t *testing.T) {
+	datas := []map[string]interface{}{
+		{
+			"a": 1,
+			"b": 3,
+			"c": 5,
+		},
+		{
+			"a": 2,
+			"b": 4,
+			"c": 6,
+		},
+	}
+	keys := []string{"a"}
+	values := []interface{}{1}
+	l := &lookupSource{}
+	got := l.lookupJoin(datas, keys, values)
+	require.Equal(t, []map[string]interface{}{
+		{
+			"a": 1,
+			"b": 3,
+			"c": 5,
+		},
+	}, got)
+}
+
+func TestLookup(t *testing.T) {
+	conf.IsTesting = false
+	conf.InitClock()
+	r := &lookupSource{}
+	server := mockAuthServer()
+	server.Start()
+	defer server.Close()
+	err := r.Configure("data3", map[string]interface{}{
+		"url":          "http://localhost:52345/",
+		"responseType": "body",
+	})
+	require.NoError(t, err)
+	tuples, err := r.Lookup(context.Background(), nil, []string{"code"}, []interface{}{float64(200)})
+	require.NoError(t, err)
+	require.Len(t, tuples, 2)
+}
+
+func TestLookupActions(t *testing.T) {
+	r := &lookupSource{}
+	require.NoError(t, r.Open(context.Background()))
+	require.NoError(t, r.Close(context.Background()))
+	require.NotNil(t, GetLookUpSource())
+}

+ 1 - 1
internal/io/http/httppull_source.go

@@ -38,7 +38,7 @@ type PullSource struct {
 
 func (hps *PullSource) Configure(device string, props map[string]interface{}) error {
 	conf.Log.Infof("Initialized Httppull source with configurations %#v.", props)
-	return hps.InitConf(device, props)
+	return hps.InitConf(device, props, WithCheckInterval(true))
 }
 
 func (hps *PullSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {