Просмотр исходного кода

feat(cli): schema api

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 лет назад
Родитель
Сommit
732a0780dd
3 измененных файлов с 182 добавлено и 5 удалено
  1. 87 4
      cmd/kuiper/main.go
  2. 5 1
      internal/pkg/model/data.go
  3. 90 0
      internal/server/rpc_schema.go

+ 87 - 4
cmd/kuiper/main.go

@@ -151,7 +151,7 @@ func main() {
 		{
 			Name:    "create",
 			Aliases: []string{"create"},
-			Usage:   "create stream $stream_name | create stream $stream_name -f $stream_def_file | create table $table_name | create table $table_name -f $table_def_file| create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file | create plugin $plugin_type $plugin_name $plugin_json | create plugin $plugin_type $plugin_name -f $plugin_def_file | create service $service_name $service_json ",
+			Usage:   "create stream $stream_name | create stream $stream_name -f $stream_def_file | create table $table_name | create table $table_name -f $table_def_file| create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file | create plugin $plugin_type $plugin_name $plugin_json | create plugin $plugin_type $plugin_name -f $plugin_def_file | create service $service_name $service_json | create schema $schema_type $schema_name $schema_json",
 
 			Subcommands: []cli.Command{
 				{
@@ -336,12 +336,34 @@ func main() {
 						return nil
 					},
 				},
+				{
+					Name:  "schema",
+					Usage: "create schema $schema_type $schema_name $schema_json",
+					Action: func(c *cli.Context) error {
+						if len(c.Args()) < 3 {
+							fmt.Printf("Expect plugin type, name and json.\n")
+							return nil
+						}
+						var reply string
+						err = client.Call("Server.CreateSchema", &model.RPCTypedArgDesc{
+							Type: c.Args()[0],
+							Name: c.Args()[1],
+							Json: c.Args()[2],
+						}, &reply)
+						if err != nil {
+							fmt.Println(err)
+						} else {
+							fmt.Println(reply)
+						}
+						return nil
+					},
+				},
 			},
 		},
 		{
 			Name:    "describe",
 			Aliases: []string{"describe"},
-			Usage:   "describe stream $stream_name | describe table $table_name | describe rule $rule_name | describe plugin $plugin_type $plugin_name | describe udf $udf_name | describe service $service_name | describe service_func $service_func_name",
+			Usage:   "describe stream $stream_name | describe table $table_name | describe rule $rule_name | describe plugin $plugin_type $plugin_name | describe udf $udf_name | describe service $service_name | describe service_func $service_func_name | describe schema $schema_type $schema_name",
 			Subcommands: []cli.Command{
 				{
 					Name:  "stream",
@@ -470,13 +492,35 @@ func main() {
 						return nil
 					},
 				},
+				{
+					Name:  "schema",
+					Usage: "describe schema $schema_type $schema_name",
+					Action: func(c *cli.Context) error {
+						if len(c.Args()) != 2 {
+							fmt.Printf("Expect schema type and name.\n")
+							return nil
+						}
+						args := &model.RPCTypedArgDesc{
+							Type: c.Args()[0],
+							Name: c.Args()[1],
+						}
+						var reply string
+						err = client.Call("Server.DescSchema", args, &reply)
+						if err != nil {
+							fmt.Println(err)
+						} else {
+							fmt.Println(reply)
+						}
+						return nil
+					},
+				},
 			},
 		},
 
 		{
 			Name:    "drop",
 			Aliases: []string{"drop"},
-			Usage:   "drop stream $stream_name | drop table $table_name |drop rule $rule_name | drop plugin $plugin_type $plugin_name -s $stop | drop service $service_name",
+			Usage:   "drop stream $stream_name | drop table $table_name |drop rule $rule_name | drop plugin $plugin_type $plugin_name -s $stop | drop service $service_name | drop schema $schema_type $schema_name",
 			Subcommands: []cli.Command{
 				{
 					Name:  "stream",
@@ -578,13 +622,35 @@ func main() {
 						return nil
 					},
 				},
+				{
+					Name:  "schema",
+					Usage: "drop schema $schema_type $schema_name",
+					Action: func(c *cli.Context) error {
+						if len(c.Args()) != 2 {
+							fmt.Printf("Expect schema type and name.\n")
+							return nil
+						}
+						args := &model.RPCTypedArgDesc{
+							Type: c.Args()[0],
+							Name: c.Args()[1],
+						}
+						var reply string
+						err = client.Call("Server.DropSchema", args, &reply)
+						if err != nil {
+							fmt.Println(err)
+						} else {
+							fmt.Println(reply)
+						}
+						return nil
+					},
+				},
 			},
 		},
 
 		{
 			Name:    "show",
 			Aliases: []string{"show"},
-			Usage:   "show streams | show tables | show rules | show plugins $plugin_type | show services | show service_funcs",
+			Usage:   "show streams | show tables | show rules | show plugins $plugin_type | show services | show service_funcs | show schemas $schema_type",
 
 			Subcommands: []cli.Command{
 				{
@@ -679,6 +745,23 @@ func main() {
 						}
 						return nil
 					},
+				}, {
+					Name:  "schemas",
+					Usage: "show schemas $schema_type",
+					Action: func(c *cli.Context) error {
+						if len(c.Args()) != 1 {
+							fmt.Printf("Expect schema type.\n")
+							return nil
+						}
+						var reply string
+						err = client.Call("Server.ShowSchemas", c.Args()[0], &reply)
+						if err != nil {
+							fmt.Println(err)
+						} else {
+							fmt.Println(reply)
+						}
+						return nil
+					},
 				},
 			},
 		},

+ 5 - 1
internal/pkg/model/data.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -18,6 +18,10 @@ type RPCArgDesc struct {
 	Name, Json string
 }
 
+type RPCTypedArgDesc struct {
+	Type, Name, Json string
+}
+
 type PluginDesc struct {
 	RPCArgDesc
 	Type int

+ 90 - 0
internal/server/rpc_schema.go

@@ -0,0 +1,90 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build !core || (rpc && schema)
+// +build !core rpc,schema
+
+package server
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/pkg/def"
+	"github.com/lf-edge/ekuiper/internal/pkg/model"
+	"github.com/lf-edge/ekuiper/internal/schema"
+)
+
+func (t *Server) CreateSchema(arg *model.RPCTypedArgDesc, reply *string) error {
+	sd := &schema.Info{Type: def.SchemaType(arg.Type)}
+	if arg.Json != "" {
+		if err := json.Unmarshal([]byte(arg.Json), sd); err != nil {
+			return fmt.Errorf("Parse service %s error : %s.", arg.Json, err)
+		}
+	}
+	if sd.Name != arg.Name {
+		return fmt.Errorf("Create schema error: name mismatch.")
+	}
+	if sd.Content != "" && sd.FilePath != "" {
+		return fmt.Errorf("Invalid body: Cannot specify both content and file")
+	}
+	err := schema.Register(sd)
+	if err != nil {
+		return fmt.Errorf("Create schema error: %s", err)
+	} else {
+		*reply = fmt.Sprintf("Schema %s is created.", arg.Name)
+	}
+	return nil
+}
+
+func (t *Server) DescSchema(arg *model.RPCTypedArgDesc, reply *string) error {
+	j, err := schema.GetSchema(def.SchemaType(arg.Type), arg.Name)
+	if err != nil {
+		return fmt.Errorf("Desc schema error : %s.", err)
+	} else if j == nil {
+		return fmt.Errorf("Desc schema error : not found.")
+	} else {
+		r, err := marshalDesc(j)
+		if err != nil {
+			return fmt.Errorf("Describe service error: %v", err)
+		}
+		*reply = r
+	}
+	return nil
+}
+
+func (t *Server) DropSchema(arg *model.RPCTypedArgDesc, reply *string) error {
+	err := schema.DeleteSchema(def.SchemaType(arg.Type), arg.Name)
+	if err != nil {
+		return fmt.Errorf("Drop schema error : %s.", err)
+	}
+	*reply = fmt.Sprintf("Schema %s is dropped", arg.Name)
+	return nil
+}
+
+func (t *Server) ShowSchemas(schemaType string, reply *string) error {
+	l, err := schema.GetAllForType(def.SchemaType(schemaType))
+	if err != nil {
+		return fmt.Errorf("Show schemas error: %s.", err)
+	}
+	if len(l) == 0 {
+		*reply = "No schema definitions are found."
+	} else {
+		r, err := marshalDesc(l)
+		if err != nil {
+			return fmt.Errorf("Show service error: %v", err)
+		}
+		*reply = r
+	}
+	return nil
+}