rpc_schema.go 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build !core || (rpc && schema)
  15. // +build !core rpc,schema
  16. package server
  17. import (
  18. "encoding/json"
  19. "fmt"
  20. "github.com/lf-edge/ekuiper/internal/pkg/def"
  21. "github.com/lf-edge/ekuiper/internal/pkg/model"
  22. "github.com/lf-edge/ekuiper/internal/schema"
  23. )
  24. func (t *Server) CreateSchema(arg *model.RPCTypedArgDesc, reply *string) error {
  25. sd := &schema.Info{Type: def.SchemaType(arg.Type)}
  26. if arg.Json != "" {
  27. if err := json.Unmarshal([]byte(arg.Json), sd); err != nil {
  28. return fmt.Errorf("Parse service %s error : %s.", arg.Json, err)
  29. }
  30. }
  31. if sd.Name != arg.Name {
  32. return fmt.Errorf("Create schema error: name mismatch.")
  33. }
  34. if sd.Content != "" && sd.FilePath != "" {
  35. return fmt.Errorf("Invalid body: Cannot specify both content and file")
  36. }
  37. err := schema.Register(sd)
  38. if err != nil {
  39. return fmt.Errorf("Create schema error: %s", err)
  40. } else {
  41. *reply = fmt.Sprintf("Schema %s is created.", arg.Name)
  42. }
  43. return nil
  44. }
  45. func (t *Server) DescSchema(arg *model.RPCTypedArgDesc, reply *string) error {
  46. j, err := schema.GetSchema(def.SchemaType(arg.Type), arg.Name)
  47. if err != nil {
  48. return fmt.Errorf("Desc schema error : %s.", err)
  49. } else if j == nil {
  50. return fmt.Errorf("Desc schema error : not found.")
  51. } else {
  52. r, err := marshalDesc(j)
  53. if err != nil {
  54. return fmt.Errorf("Describe service error: %v", err)
  55. }
  56. *reply = r
  57. }
  58. return nil
  59. }
  60. func (t *Server) DropSchema(arg *model.RPCTypedArgDesc, reply *string) error {
  61. err := schema.DeleteSchema(def.SchemaType(arg.Type), arg.Name)
  62. if err != nil {
  63. return fmt.Errorf("Drop schema error : %s.", err)
  64. }
  65. *reply = fmt.Sprintf("Schema %s is dropped", arg.Name)
  66. return nil
  67. }
  68. func (t *Server) ShowSchemas(schemaType string, reply *string) error {
  69. l, err := schema.GetAllForType(def.SchemaType(schemaType))
  70. if err != nil {
  71. return fmt.Errorf("Show schemas error: %s.", err)
  72. }
  73. if len(l) == 0 {
  74. *reply = "No schema definitions are found."
  75. } else {
  76. r, err := marshalDesc(l)
  77. if err != nil {
  78. return fmt.Errorf("Show service error: %v", err)
  79. }
  80. *reply = r
  81. }
  82. return nil
  83. }