Browse Source

Lookup table fix and new API (#1419)

* fix(binder): return not_found error for lookup source

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* feat(rest):  API to return table of a kind

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 2 years ago
parent
commit
2da9d18d5b

+ 6 - 0
docs/en_US/operation/restapi/tables.md

@@ -31,6 +31,12 @@ Response Sample:
 ["mytable"]
 ```
 
+This API accepts one parameter kind, the value could be `scan` or `lookup` to query each kind of tables. Other values are invalid, it will return all kinds of tables. In below example, we can query all the lookup tables.
+
+```shell
+GET http://localhost:9081/tables?kind=lookup
+```
+
 ## describe a table
 
 The API is used for print the detailed definition of table.

+ 7 - 1
docs/zh_CN/operation/restapi/tables.md

@@ -17,7 +17,7 @@ POST http://localhost:9081/tables
 
 ## 查看所有的表
 
-此API 用于显示 eKuiper 中定义的所有表
+此 API 用于显示 eKuiper 中定义的所有表
 ```shell
 GET http://localhost:9081/tables
 ```
@@ -27,6 +27,12 @@ GET http://localhost:9081/tables
 ["mytable"]
 ```
 
+此 API 可接受一个参数 kind,用于指定所需查看的表的类型。类型值可为 `scan` 或者 `lookup`。其他类型值将返回所有表格。在如下例子中,我们将查看所有 lookup table。
+
+```shell
+GET http://localhost:9081/tables?kind=lookup
+```
+
 ## 查看表的详细信息
 
 该 API 用于打印表的详细定义。

+ 6 - 1
internal/binder/io/binder.go

@@ -15,6 +15,7 @@
 package io
 
 import (
+	"fmt"
 	"github.com/lf-edge/ekuiper/internal/binder"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
@@ -92,5 +93,9 @@ func LookupSource(name string) (api.LookupSource, error) {
 			return r, e.GetError()
 		}
 	}
-	return nil, e.GetError()
+	err := e.GetError()
+	if err == nil {
+		err = fmt.Errorf("lookup source type %s not found", name)
+	}
+	return nil, err
 }

+ 30 - 16
internal/binder/io/binder_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -33,26 +33,36 @@ func TestBindings(t *testing.T) {
 		return
 	}
 	var tests = []struct {
-		name     string
-		isSource bool
-		isSink   bool
+		name           string
+		isSource       bool
+		isLookupSource bool
+		isSink         bool
 	}{
 		{
-			name:     "unknown",
-			isSource: false,
-			isSink:   false,
+			name:           "unknown",
+			isSource:       false,
+			isLookupSource: false,
+			isSink:         false,
 		}, {
-			name:     "mqtt",
-			isSource: true,
-			isSink:   true,
+			name:           "mqtt",
+			isSource:       true,
+			isLookupSource: false,
+			isSink:         true,
 		}, {
-			name:     "mock1",
-			isSource: true,
-			isSink:   true,
+			name:           "mock1",
+			isSource:       true,
+			isLookupSource: false,
+			isSink:         true,
 		}, {
-			name:     "rest",
-			isSource: false,
-			isSink:   true,
+			name:           "rest",
+			isSource:       false,
+			isLookupSource: false,
+			isSink:         true,
+		}, {
+			name:           "redis",
+			isSource:       false,
+			isLookupSource: true,
+			isSink:         true,
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
@@ -62,6 +72,10 @@ func TestBindings(t *testing.T) {
 		if tt.isSource != isSource {
 			t.Errorf("%s is source: expect %v but got %v", tt.name, tt.isSource, isSource)
 		}
+		_, err = LookupSource(tt.name)
+		if tt.isLookupSource != (err == nil) {
+			t.Errorf("%s is lookup source: expect %v but got %v", tt.name, tt.isLookupSource, err == nil)
+		}
 		_, err = Sink(tt.name)
 		isSink := err == nil
 		if tt.isSink != isSink {

+ 1 - 1
internal/binder/mock/mock_factory.go

@@ -36,7 +36,7 @@ func (f *MockFactory) Source(name string) (api.Source, error) {
 }
 
 func (f *MockFactory) LookupSource(name string) (api.LookupSource, error) {
-	return nil, errorx.NotFoundErr
+	return nil, nil
 }
 
 func (f *MockFactory) Sink(name string) (api.Sink, error) {

+ 28 - 0
internal/processor/stream.go

@@ -146,6 +146,7 @@ func (p *StreamProcessor) execSave(stmt *ast.StreamStmt, statement string, repla
 	s, err := json.Marshal(xsql.StreamInfo{
 		StreamType: stmt.StreamType,
 		Statement:  statement,
+		StreamKind: stmt.Options.KIND,
 	})
 	if err != nil {
 		return fmt.Errorf("error when saving to db: %v.", err)
@@ -224,6 +225,33 @@ func (p *StreamProcessor) ShowStream(st ast.StreamType) ([]string, error) {
 	return result, nil
 }
 
+func (p *StreamProcessor) ShowTable(kind string) ([]string, error) {
+	if kind == "" {
+		return p.ShowStream(ast.TypeTable)
+	}
+	keys, err := p.db.Keys()
+	if err != nil {
+		return nil, fmt.Errorf("Show tables fails, error when loading data from db: %v.", err)
+	}
+	var (
+		v      string
+		vs     = &xsql.StreamInfo{}
+		result = make([]string, 0)
+	)
+	for _, k := range keys {
+		if ok, _ := p.db.Get(k, &v); ok {
+			if err := json.Unmarshal([]byte(v), vs); err == nil && vs.StreamType == ast.TypeTable {
+				if kind == "scan" && (vs.StreamKind == ast.StreamKindScan || vs.StreamKind == "") {
+					result = append(result, k)
+				} else if kind == "lookup" && vs.StreamKind == ast.StreamKindLookup {
+					result = append(result, k)
+				}
+			}
+		}
+	}
+	return result, nil
+}
+
 func (p *StreamProcessor) getStream(name string, st ast.StreamType) (string, error) {
 	vs, err := xsql.GetDataSourceStatement(p.db, name)
 	if vs != nil && vs.StreamType == st {

+ 35 - 1
internal/processor/stream_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -201,3 +201,37 @@ func TestTableProcessor(t *testing.T) {
 		}
 	}
 }
+
+func TestTableList(t *testing.T) {
+	p := NewStreamProcessor()
+	p.ExecStmt(`CREATE TABLE tt1 () WITH (DATASOURCE="users", FORMAT="JSON", KIND="scan")`)
+	p.ExecStmt(`CREATE TABLE tt2 () WITH (DATASOURCE="users", TYPE="memory", FORMAT="JSON", KIND="lookup")`)
+	p.ExecStmt(`CREATE TABLE tt3 () WITH (DATASOURCE="users", TYPE="memory", FORMAT="JSON", KIND="lookup")`)
+	p.ExecStmt(`CREATE TABLE tt4 () WITH (DATASOURCE="users", FORMAT="JSON")`)
+	defer func() {
+		p.ExecStmt(`DROP TABLE tt1`)
+		p.ExecStmt(`DROP TABLE tt2`)
+		p.ExecStmt(`DROP TABLE tt3`)
+		p.ExecStmt(`DROP TABLE tt4`)
+	}()
+	la, err := p.ShowTable("lookup")
+	if err != nil {
+		t.Errorf("Show lookup table fails: %s", err)
+		return
+	}
+	le := []string{"tt2", "tt3"}
+	if !reflect.DeepEqual(le, la) {
+		t.Errorf("Show lookup table mismatch:\nexp=%s\ngot=%s", le, la)
+		return
+	}
+	ls, err := p.ShowTable("scan")
+	if err != nil {
+		t.Errorf("Show scan table fails: %s", err)
+		return
+	}
+	lse := []string{"tt1", "tt4"}
+	if !reflect.DeepEqual(lse, ls) {
+		t.Errorf("Show scan table mismatch:\nexp=%s\ngot=%s", lse, ls)
+		return
+	}
+}

+ 20 - 1
internal/server/rest.go

@@ -278,7 +278,26 @@ func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamT
 	defer r.Body.Close()
 	switch r.Method {
 	case http.MethodGet:
-		content, err := streamProcessor.ShowStream(st)
+		var (
+			content []string
+			err     error
+			kind    string
+		)
+		if st == ast.TypeTable {
+			kind = r.URL.Query().Get("kind")
+			if kind == "scan" {
+				kind = ast.StreamKindScan
+			} else if kind == "lookup" {
+				kind = ast.StreamKindLookup
+			} else {
+				kind = ""
+			}
+		}
+		if kind != "" {
+			content, err = streamProcessor.ShowTable(kind)
+		} else {
+			content, err = streamProcessor.ShowStream(st)
+		}
 		if err != nil {
 			handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
 			return

+ 1 - 0
internal/xsql/stmtx.go

@@ -55,6 +55,7 @@ func GetStatementFromSql(sql string) (*ast.SelectStatement, error) {
 
 type StreamInfo struct {
 	StreamType ast.StreamType `json:"streamType"`
+	StreamKind string         `json:"streamKind"`
 	Statement  string         `json:"statement"`
 }