Prechádzať zdrojové kódy

feat(schema): API to get the inferred schema

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 rokov pred
rodič
commit
443aa62d2d

+ 84 - 0
internal/processor/stream.go

@@ -20,6 +20,7 @@ import (
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/store"
+	"github.com/lf-edge/ekuiper/internal/schema"
 	"github.com/lf-edge/ekuiper/internal/topo/lookup"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/ast"
@@ -335,6 +336,89 @@ func (p *StreamProcessor) DescStream(name string, st ast.StreamType) (ast.Statem
 	return stream, nil
 }
 
+func (p *StreamProcessor) GetInferredSchema(name string, st ast.StreamType) (ast.StreamFields, error) {
+	statement, err := p.getStream(name, st)
+	if err != nil {
+		return nil, fmt.Errorf("Describe %s fails, %s.", ast.StreamTypeMap[st], err)
+	}
+	parser := xsql.NewParser(strings.NewReader(statement))
+	stream, err := xsql.Language.Parse(parser)
+	if err != nil {
+		return nil, err
+	}
+	stmt, ok := stream.(*ast.StreamStmt)
+	if !ok {
+		return nil, fmt.Errorf("Describe %s fails, cannot parse the data \"%s\" to a stream statement", ast.StreamTypeMap[st], statement)
+	}
+	if stmt.Options.SCHEMAID != "" {
+		return schema.InferFromSchemaFile(stmt.Options.FORMAT, stmt.Options.SCHEMAID)
+	}
+	return nil, nil
+}
+
+// GetInferredJsonSchema return schema in json schema type
+func (p *StreamProcessor) GetInferredJsonSchema(name string, st ast.StreamType) (map[string]*ast.JsonStreamField, error) {
+	statement, err := p.getStream(name, st)
+	if err != nil {
+		return nil, fmt.Errorf("Describe %s fails, %s.", ast.StreamTypeMap[st], err)
+	}
+	parser := xsql.NewParser(strings.NewReader(statement))
+	stream, err := xsql.Language.Parse(parser)
+	if err != nil {
+		return nil, err
+	}
+	stmt, ok := stream.(*ast.StreamStmt)
+	if !ok {
+		return nil, fmt.Errorf("Describe %s fails, cannot parse the data \"%s\" to a stream statement", ast.StreamTypeMap[st], statement)
+	}
+	sfs := stmt.StreamFields
+	if stmt.Options.SCHEMAID != "" {
+		sfs, err = schema.InferFromSchemaFile(stmt.Options.FORMAT, stmt.Options.SCHEMAID)
+		if err != nil {
+			return nil, err
+		}
+	}
+	return convertSchema(sfs), nil
+}
+
+func convertSchema(sfs ast.StreamFields) map[string]*ast.JsonStreamField {
+	result := make(map[string]*ast.JsonStreamField, len(sfs))
+	for _, sf := range sfs {
+		result[sf.Name] = convertFieldType(sf.FieldType)
+	}
+	return result
+}
+
+func convertFieldType(sf ast.FieldType) *ast.JsonStreamField {
+	switch t := sf.(type) {
+	case *ast.BasicType:
+		return &ast.JsonStreamField{
+			Type: t.Type.String(),
+		}
+	case *ast.ArrayType:
+		var items *ast.JsonStreamField
+		switch t.Type {
+		case ast.ARRAY, ast.STRUCT:
+			items = convertFieldType(t.FieldType)
+		default:
+			items = &ast.JsonStreamField{
+				Type: t.Type.String(),
+			}
+		}
+		return &ast.JsonStreamField{
+			Type:  "array",
+			Items: items,
+		}
+	case *ast.RecType:
+		return &ast.JsonStreamField{
+			Type:       "struct",
+			Properties: convertSchema(t.StreamFields),
+		}
+	default: // should never happen
+		return nil
+	}
+}
+
 func (p *StreamProcessor) execExplain(stmt ast.NameNode, st ast.StreamType) (string, error) {
 	_, err := p.getStream(stmt.GetName(), st)
 	if err != nil {

+ 111 - 0
internal/processor/stream_test.go

@@ -16,8 +16,15 @@ package processor
 
 import (
 	"fmt"
+	"github.com/gdexlab/go-render/render"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/schema"
 	"github.com/lf-edge/ekuiper/internal/testx"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+	"os"
+	"path/filepath"
 	"reflect"
+	"strconv"
 	"testing"
 )
 
@@ -267,3 +274,107 @@ func TestAll(t *testing.T) {
 		t.Errorf("Expect\t %v\nBut got\t%v", expected, all)
 	}
 }
+
+func TestInferredStream(t *testing.T) {
+	// init schema
+	// Prepare test schema file
+	dataDir, err := conf.GetDataLoc()
+	if err != nil {
+		t.Fatal(err)
+	}
+	etcDir := filepath.Join(dataDir, "schemas", "custom")
+	err = os.MkdirAll(etcDir, os.ModePerm)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() {
+		err = os.RemoveAll(etcDir)
+		if err != nil {
+			t.Fatal(err)
+		}
+	}()
+	// build the so file into data/test prior to running the test
+	bytesRead, err := os.ReadFile(filepath.Join(dataDir, "myFormat.so"))
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = os.WriteFile(filepath.Join(etcDir, "myFormat.so"), bytesRead, 0755)
+	if err != nil {
+		t.Fatal(err)
+	}
+	petcDir := filepath.Join(dataDir, "schemas", "protobuf")
+	err = os.MkdirAll(petcDir, os.ModePerm)
+	if err != nil {
+		t.Fatal(err)
+	}
+	//Copy test2.proto
+	bytesRead, err = os.ReadFile("../schema/test/test2.proto")
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = os.WriteFile(filepath.Join(petcDir, "test2.proto"), bytesRead, 0755)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() {
+		err = os.RemoveAll(petcDir)
+		if err != nil {
+			t.Fatal(err)
+		}
+	}()
+	schema.InitRegistry()
+
+	var tests = []struct {
+		s   string
+		r   map[string]*ast.JsonStreamField
+		err string
+	}{
+		{
+			s: `CREATE STREAM demo0 (USERID bigint, NAME string) WITH (FORMAT="JSON", DATASOURCE="demo", SHARED="TRUE")`,
+			r: map[string]*ast.JsonStreamField{
+				"USERID": {Type: "bigint"},
+				"NAME":   {Type: "string"},
+			},
+		}, {
+			s: `CREATE STREAM demo1 (USERID bigint, NAME string) WITH (FORMAT="protobuf", DATASOURCE="demo", SCHEMAID="test2.Book")`,
+			r: map[string]*ast.JsonStreamField{
+				"name":   {Type: "string"},
+				"author": {Type: "string"},
+			},
+		}, {
+			s: `CREATE STREAM demo2 () WITH (FORMAT="custom", DATASOURCE="demo", SCHEMAID="myFormat.Sample")`,
+			r: map[string]*ast.JsonStreamField{
+				"id":   {Type: "bigint"},
+				"name": {Type: "string"},
+				"age":  {Type: "bigint"},
+				"hobbies": {
+					Type: "struct",
+					Properties: map[string]*ast.JsonStreamField{
+						"indoor":  {Type: "array", Items: &ast.JsonStreamField{Type: "string"}},
+						"outdoor": {Type: "array", Items: &ast.JsonStreamField{Type: "string"}},
+					},
+				},
+			},
+		},
+	}
+
+	p := NewStreamProcessor()
+	p.db.Clean()
+	defer p.db.Clean()
+	for i, tt := range tests {
+		_, err := p.ExecStmt(tt.s)
+		if err != nil {
+			t.Errorf("%d. ExecStmt(%q) error: %v", i, tt.s, err)
+			continue
+		}
+		sf, err := p.GetInferredJsonSchema("demo"+strconv.Itoa(i), ast.TypeStream)
+		if err != nil {
+			t.Errorf("GetInferredJsonSchema fails: %s", err)
+			continue
+		}
+		if !reflect.DeepEqual(sf, tt.r) {
+			t.Errorf("GetInferredJsonSchema mismatch:\nexp=%v\ngot=%v", render.AsCode(tt.r), render.AsCode(sf))
+		}
+	}
+
+}

+ 21 - 0
internal/server/rest.go

@@ -122,8 +122,10 @@ func createRestServer(ip string, port int, needToken bool) *http.Server {
 	r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
 	r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
 	r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
+	r.HandleFunc("/streams/{name}/schema", streamSchemaHandler).Methods(http.MethodGet)
 	r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost)
 	r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
+	r.HandleFunc("/tables/{name}/schema", tableSchemaHandler).Methods(http.MethodGet)
 	r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
 	r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
 	r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
@@ -386,6 +388,25 @@ func tableHandler(w http.ResponseWriter, r *http.Request) {
 	sourceManageHandler(w, r, ast.TypeTable)
 }
 
+func streamSchemaHandler(w http.ResponseWriter, r *http.Request) {
+	sourceSchemaHandler(w, r, ast.TypeStream)
+}
+
+func tableSchemaHandler(w http.ResponseWriter, r *http.Request) {
+	sourceSchemaHandler(w, r, ast.TypeTable)
+}
+
+func sourceSchemaHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
+	vars := mux.Vars(r)
+	name := vars["name"]
+	content, err := streamProcessor.GetInferredJsonSchema(name, st)
+	if err != nil {
+		handleError(w, err, fmt.Sprintf("get schema of %s error", ast.StreamTypeMap[st]), logger)
+		return
+	}
+	jsonResponse(content, w, logger)
+}
+
 // list or create rules
 func rulesHandler(w http.ResponseWriter, r *http.Request) {
 	defer r.Body.Close()