Переглянути джерело

feat(schema): implement schema registry (#1271)

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 2 роки тому
батько
коміт
92f23b7add

+ 25 - 0
internal/pkg/def/schema.go

@@ -0,0 +1,25 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package def
+
+type SchemaType string
+
+const (
+	PROTOBUF SchemaType = "protobuf"
+)
+
+var SchemaTypes = []SchemaType{
+	PROTOBUF,
+}

+ 166 - 0
internal/schema/registry.go

@@ -0,0 +1,166 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package schema
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/pkg/def"
+	"github.com/lf-edge/ekuiper/internal/pkg/httpx"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"strings"
+	"sync"
+)
+
+// Initialize in the server startup
+var registry *Registry
+
+// Registry is a global registry for schemas
+// It stores the schema ids and the ref to its file content in memory
+// The schema definition is stored in the file system and will only be loaded once used
+type Registry struct {
+	sync.RWMutex
+	// The map of schema files for all types
+	schemas map[def.SchemaType]map[string]string
+}
+
+// Registry provide the method to add, update, get and parse and delete schemas
+
+// InitRegistry initialize the registry, only called once by the server
+func InitRegistry() error {
+	registry = &Registry{
+		schemas: make(map[def.SchemaType]map[string]string, len(def.SchemaTypes)),
+	}
+	etcDir, err := conf.GetConfLoc()
+	if err != nil {
+		return fmt.Errorf("cannot find etc folder: %s", err)
+	}
+	for _, schemaType := range def.SchemaTypes {
+		schemaDir := filepath.Join(etcDir, string(schemaType))
+		var newSchemas map[string]string
+		files, err := ioutil.ReadDir(schemaDir)
+		if err != nil {
+			conf.Log.Warnf("cannot read schema directory: %s", err)
+			newSchemas = make(map[string]string)
+		} else {
+			newSchemas = make(map[string]string, len(files))
+			for _, file := range files {
+				fileName := filepath.Base(file.Name())
+				schemaId := strings.TrimSuffix(fileName, filepath.Ext(fileName))
+				newSchemas[schemaId] = filepath.Join(schemaDir, file.Name())
+			}
+		}
+		registry.schemas[schemaType] = newSchemas
+	}
+	return nil
+}
+
+func GetAllForType(schemaType def.SchemaType) ([]string, error) {
+	registry.RLock()
+	defer registry.RUnlock()
+	if _, ok := registry.schemas[schemaType]; !ok {
+		return nil, fmt.Errorf("schema type %s not found", schemaType)
+	}
+	result := make([]string, 0, len(registry.schemas[schemaType]))
+	for k := range registry.schemas[schemaType] {
+		result = append(result, k)
+	}
+	return result, nil
+}
+
+func Register(info *Info) error {
+	if _, ok := registry.schemas[info.Type]; !ok {
+		return fmt.Errorf("schema type %s not found", info.Type)
+	}
+	if _, ok := registry.schemas[info.Type][info.Name]; ok {
+		return fmt.Errorf("schema %s.%s already registered", info.Type, info.Name)
+	}
+	return CreateOrUpdateSchema(info)
+}
+
+func CreateOrUpdateSchema(info *Info) error {
+	if _, ok := registry.schemas[info.Type]; !ok {
+		return fmt.Errorf("schema type %s not found", info.Type)
+	}
+	etcDir, _ := conf.GetConfLoc()
+	etcDir = filepath.Join(etcDir, string(info.Type))
+	if err := os.MkdirAll(etcDir, os.ModePerm); err != nil {
+		return err
+	}
+	schemaFile := filepath.Join(etcDir, info.Name+schemaExt[info.Type])
+	if _, err := os.Stat(schemaFile); os.IsNotExist(err) {
+		file, err := os.Create(schemaFile)
+		if err != nil {
+			return err
+		}
+		defer file.Close()
+	}
+	if info.Content != "" {
+		err := os.WriteFile(schemaFile, []byte(info.Content), 0666)
+		if err != nil {
+			return err
+		}
+	} else {
+		err := httpx.DownloadFile(schemaFile, info.FilePath)
+		if err != nil {
+			return err
+		}
+	}
+
+	registry.schemas[info.Type][info.Name] = schemaFile
+	return nil
+}
+
+func GetSchemaContent(schemaType def.SchemaType, name string) (*Info, error) {
+	registry.RLock()
+	defer registry.RUnlock()
+	if _, ok := registry.schemas[schemaType]; !ok {
+		return nil, fmt.Errorf("schema type %s not found", schemaType)
+	}
+	if _, ok := registry.schemas[schemaType][name]; !ok {
+		return nil, fmt.Errorf("schema %s.%s not found", schemaType, name)
+	}
+	schemaFile := registry.schemas[schemaType][name]
+	content, err := ioutil.ReadFile(schemaFile)
+	if err != nil {
+		return nil, fmt.Errorf("cannot read schema file %s: %s", schemaFile, err)
+	}
+	return &Info{
+		Type:     schemaType,
+		Name:     name,
+		Content:  string(content),
+		FilePath: schemaFile,
+	}, nil
+}
+
+func DeleteSchema(schemaType def.SchemaType, name string) error {
+	registry.Lock()
+	defer registry.Unlock()
+	if _, ok := registry.schemas[schemaType]; !ok {
+		return fmt.Errorf("schema type %s not found", schemaType)
+	}
+	if _, ok := registry.schemas[schemaType][name]; !ok {
+		return fmt.Errorf("schema %s.%s not found", schemaType, name)
+	}
+	schemaFile := registry.schemas[schemaType][name]
+	err := os.Remove(schemaFile)
+	if err != nil {
+		conf.Log.Errorf("cannot delete schema file %s: %s", schemaFile, err)
+	}
+	delete(registry.schemas[schemaType], name)
+	return nil
+}

+ 189 - 0
internal/schema/registry_test.go

@@ -0,0 +1,189 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package schema
+
+import (
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/testx"
+	"io/ioutil"
+	"net/http"
+	"net/http/httptest"
+	"os"
+	"path/filepath"
+	"reflect"
+	"testing"
+)
+
+func TestRegistry(t *testing.T) {
+	testx.InitEnv()
+	// Move test schema file to etc dir
+	etcDir, err := conf.GetConfLoc()
+	if err != nil {
+		t.Fatal(err)
+	}
+	etcDir = filepath.Join(etcDir, "protobuf")
+	err = os.MkdirAll(etcDir, os.ModePerm)
+	if err != nil {
+		t.Fatal(err)
+	}
+	//Copy init.proto
+	bytesRead, err := ioutil.ReadFile("test/init.proto")
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = ioutil.WriteFile(filepath.Join(etcDir, "init.proto"), bytesRead, 0755)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() {
+		err = os.RemoveAll(etcDir)
+		if err != nil {
+			t.Fatal(err)
+		}
+	}()
+	err = InitRegistry()
+	if err != nil {
+		t.Errorf("InitRegistry error: %v", err)
+		return
+	}
+	s := httptest.NewServer(
+		http.FileServer(http.Dir("test")),
+	)
+	defer s.Close()
+	endpoint := s.URL
+	// Create 1 by file
+	schema1 := &Info{
+		Name:     "test1",
+		Type:     "protobuf",
+		FilePath: endpoint + "/test1.proto",
+	}
+	err = Register(schema1)
+	if err != nil {
+		t.Errorf("Register schema1 error: %v", err)
+		return
+	}
+	// Get 1
+	expectedSchema := &Info{
+		Type:     "protobuf",
+		Name:     "test1",
+		Content:  "syntax = \"proto3\";message Person {string name = 1;int32 id = 2;string email = 3;}",
+		FilePath: filepath.Join(etcDir, "test1.proto"),
+	}
+	gottenSchema, err := GetSchemaContent("protobuf", "test1")
+	if !reflect.DeepEqual(gottenSchema, expectedSchema) {
+		t.Errorf("Get test1 unmatch: Expect\n%v\nbut got\n%v", *expectedSchema, *gottenSchema)
+		return
+	}
+	// Create 2 by content
+	schema2 := &Info{
+		Name:    "test2",
+		Type:    "protobuf",
+		Content: "message Book{\n  required string name = 1;}",
+	}
+	err = Register(schema2)
+	if err != nil {
+		t.Errorf("Register schema2 error: %v", err)
+		return
+	}
+	// Update 2 by file
+	updatedSchema2 := &Info{
+		Name:     "test2",
+		Type:     "protobuf",
+		FilePath: endpoint + "/test2.proto",
+	}
+	err = CreateOrUpdateSchema(updatedSchema2)
+	if err != nil {
+		t.Errorf("Update Schema2 error: %v", err)
+		return
+	}
+	// List & check file
+	regSchemas, err := GetAllForType("protobuf")
+	expectedSchemas := []string{
+		"init", "test1", "test2",
+	}
+	if !reflect.DeepEqual(len(regSchemas), len(expectedSchemas)) {
+		t.Errorf("Expect\n%v\nbut got\n%v", expectedSchemas, regSchemas)
+		return
+	}
+	checkFile(etcDir, expectedSchemas, t)
+	// Delete 2
+	err = DeleteSchema("protobuf", "test2")
+	if err != nil {
+		t.Errorf("Delete Schema2 error: %v", err)
+		return
+	}
+	// Update 1 by content
+	updatedSchema1 := &Info{
+		Name:    "test1",
+		Type:    "protobuf",
+		Content: "message Person{required string name = 1;required int32 id = 2;optional string email = 3;}",
+	}
+	err = CreateOrUpdateSchema(updatedSchema1)
+	if err != nil {
+		t.Errorf("Update Schema1 error: %v", err)
+		return
+	}
+	// List & check file
+	regSchemas, err = GetAllForType("protobuf")
+	expectedSchemas = []string{
+		"init", "test1",
+	}
+	if !reflect.DeepEqual(len(regSchemas), len(expectedSchemas)) {
+		t.Errorf("Expect\n%v\nbut got\n%v", expectedSchemas, regSchemas)
+		return
+	}
+	checkFile(etcDir, expectedSchemas, t)
+	// Delete 1
+	err = DeleteSchema("protobuf", "test1")
+	if err != nil {
+		t.Errorf("Delete Schema1 error: %v", err)
+		return
+	}
+	// List & check file
+	regSchemas, err = GetAllForType("protobuf")
+	expectedSchemas = []string{
+		"init",
+	}
+	if !reflect.DeepEqual(regSchemas, expectedSchemas) {
+		t.Errorf("Expect\n%v\nbut got\n%v", expectedSchemas, regSchemas)
+		return
+	}
+	checkFile(etcDir, expectedSchemas, t)
+}
+
+func checkFile(etcDir string, schemas []string, t *testing.T) {
+	files, err := ioutil.ReadDir(etcDir)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(files) != len(schemas) {
+		t.Errorf("Expect %d files but got %d", len(schemas), len(files))
+		return
+	}
+	for _, file := range files {
+		fileName := filepath.Base(file.Name())
+		found := false
+		for _, schema := range schemas {
+			if fileName == schema+".proto" {
+				found = true
+				break
+			}
+		}
+		if !found {
+			t.Errorf("Expect %s but got %s", schemas, fileName)
+			return
+		}
+	}
+}

+ 30 - 0
internal/schema/schema.go

@@ -0,0 +1,30 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package schema
+
+import "github.com/lf-edge/ekuiper/internal/pkg/def"
+
+type Info struct {
+	Type     def.SchemaType `json:"type"`
+	Name     string         `json:"name"`
+	Content  string         `json:"content"`
+	FilePath string         `json:"file"`
+}
+
+var (
+	schemaExt = map[def.SchemaType]string{
+		def.PROTOBUF: ".proto",
+	}
+)

+ 5 - 0
internal/schema/test/init.proto

@@ -0,0 +1,5 @@
+syntax = "proto3";
+
+message HelloRequest {
+  string name = 1;
+}

+ 1 - 0
internal/schema/test/test1.proto

@@ -0,0 +1 @@
+syntax = "proto3";message Person {string name = 1;int32 id = 2;string email = 3;}

+ 4 - 0
internal/schema/test/test2.proto

@@ -0,0 +1,4 @@
+message Book {
+  required string name = 1;
+  required string author = 2;
+}

+ 129 - 0
internal/server/schema_init.go

@@ -0,0 +1,129 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build schema || !core
+// +build schema !core
+
+package server
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/gorilla/mux"
+	"github.com/lf-edge/ekuiper/internal/pkg/def"
+	"github.com/lf-edge/ekuiper/internal/schema"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
+	"net/http"
+)
+
+func init() {
+	components["schema"] = schemaComp{}
+}
+
+type schemaComp struct{}
+
+func (sc schemaComp) register() {
+	err := schema.InitRegistry()
+	if err != nil {
+		panic(err)
+	}
+}
+
+func (sc schemaComp) rest(r *mux.Router) {
+	r.HandleFunc("/schemas/{type}", schemasHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/schemas/{type}/{name}", schemaHandler).Methods(http.MethodPut, http.MethodDelete, http.MethodGet)
+}
+
+func schemasHandler(w http.ResponseWriter, r *http.Request) {
+	vars := mux.Vars(r)
+	st := vars["type"]
+	defer r.Body.Close()
+	switch r.Method {
+	case http.MethodGet:
+		l, err := schema.GetAllForType(def.SchemaType(st))
+		if err != nil {
+			handleError(w, err, "", logger)
+			return
+		}
+		jsonResponse(l, w, logger)
+	case http.MethodPost:
+		sch := &schema.Info{Type: def.SchemaType(st)}
+		err := json.NewDecoder(r.Body).Decode(sch)
+		if err != nil {
+			handleError(w, err, "Invalid body: Error decoding schema json", logger)
+			return
+		}
+		if sch.Content != "" && sch.FilePath != "" {
+			handleError(w, nil, "Invalid body: Cannot specify both content and file", logger)
+			return
+		}
+		err = schema.Register(sch)
+		if err != nil {
+			handleError(w, err, "schema create command error", logger)
+			return
+		}
+		w.WriteHeader(http.StatusCreated)
+		w.Write([]byte(fmt.Sprintf("%s schema %s is created", sch.Type, sch.Name)))
+	}
+}
+
+func schemaHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	vars := mux.Vars(r)
+	st := vars["type"]
+	name := vars["name"]
+	switch r.Method {
+	case http.MethodGet:
+		j, err := schema.GetSchemaContent(def.SchemaType(st), name)
+		if err != nil {
+			handleError(w, err, "", logger)
+			return
+		} else if j == nil {
+			handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), "", logger)
+			return
+		}
+		jsonResponse(j, w, logger)
+	case http.MethodDelete:
+		err := schema.DeleteSchema(def.SchemaType(st), name)
+		if err != nil {
+			handleError(w, err, fmt.Sprintf("delete %s schema %s error", st, name), logger)
+			return
+		}
+		w.WriteHeader(http.StatusOK)
+		result := fmt.Sprintf("%s schema %s is deleted", st, name)
+		w.Write([]byte(result))
+	case http.MethodPut:
+		sch := &schema.Info{Type: def.SchemaType(st), Name: name}
+		err := json.NewDecoder(r.Body).Decode(sch)
+		if err != nil {
+			handleError(w, err, "Invalid body: Error decoding schema json", logger)
+			return
+		}
+		if sch.Type != def.SchemaType(st) || sch.Name != name {
+			handleError(w, nil, "Invalid body: Type or name does not match", logger)
+			return
+		}
+		if sch.Content != "" && sch.FilePath != "" {
+			handleError(w, nil, "Invalid body: Cannot specify both content and file", logger)
+			return
+		}
+		err = schema.CreateOrUpdateSchema(sch)
+		if err != nil {
+			handleError(w, err, "schema update command error", logger)
+			return
+		}
+		w.WriteHeader(http.StatusOK)
+		w.Write([]byte(fmt.Sprintf("%s schema %s is updated", sch.Type, sch.Name)))
+	}
+}