rpc_schema.go 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build !core || (rpc && schema)
  15. // +build !core rpc,schema
  16. package server
  17. import (
  18. "encoding/json"
  19. "fmt"
  20. "github.com/lf-edge/ekuiper/internal/pkg/def"
  21. "github.com/lf-edge/ekuiper/internal/pkg/model"
  22. "github.com/lf-edge/ekuiper/internal/schema"
  23. "github.com/lf-edge/ekuiper/pkg/cast"
  24. )
  25. func (t *Server) CreateSchema(arg *model.RPCTypedArgDesc, reply *string) error {
  26. sd := &schema.Info{Type: def.SchemaType(arg.Type)}
  27. if arg.Json != "" {
  28. if err := json.Unmarshal(cast.StringToBytes(arg.Json), sd); err != nil {
  29. return fmt.Errorf("Parse service %s error : %s.", arg.Json, err)
  30. }
  31. }
  32. if sd.Name != arg.Name {
  33. return fmt.Errorf("Create schema error: name mismatch.")
  34. }
  35. if sd.Content != "" && sd.FilePath != "" {
  36. return fmt.Errorf("Invalid body: Cannot specify both content and file")
  37. }
  38. err := schema.Register(sd)
  39. if err != nil {
  40. return fmt.Errorf("Create schema error: %s", err)
  41. } else {
  42. *reply = fmt.Sprintf("Schema %s is created.", arg.Name)
  43. }
  44. return nil
  45. }
  46. func (t *Server) DescSchema(arg *model.RPCTypedArgDesc, reply *string) error {
  47. j, err := schema.GetSchema(def.SchemaType(arg.Type), arg.Name)
  48. if err != nil {
  49. return fmt.Errorf("Desc schema error : %s.", err)
  50. } else if j == nil {
  51. return fmt.Errorf("Desc schema error : not found.")
  52. } else {
  53. r, err := marshalDesc(j)
  54. if err != nil {
  55. return fmt.Errorf("Describe service error: %v", err)
  56. }
  57. *reply = r
  58. }
  59. return nil
  60. }
  61. func (t *Server) DropSchema(arg *model.RPCTypedArgDesc, reply *string) error {
  62. err := schema.DeleteSchema(def.SchemaType(arg.Type), arg.Name)
  63. if err != nil {
  64. return fmt.Errorf("Drop schema error : %s.", err)
  65. }
  66. *reply = fmt.Sprintf("Schema %s is dropped", arg.Name)
  67. return nil
  68. }
  69. func (t *Server) ShowSchemas(schemaType string, reply *string) error {
  70. l, err := schema.GetAllForType(def.SchemaType(schemaType))
  71. if err != nil {
  72. return fmt.Errorf("Show schemas error: %s.", err)
  73. }
  74. if len(l) == 0 {
  75. *reply = "No schema definitions are found."
  76. } else {
  77. r, err := marshalDesc(l)
  78. if err != nil {
  79. return fmt.Errorf("Show service error: %v", err)
  80. }
  81. *reply = r
  82. }
  83. return nil
  84. }