Forráskód Böngészése

refactor(schema): rename schema type static to custom

And add schema validation

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 éve
szülő
commit
9a8d308b53

+ 1 - 1
.github/workflows/run_test_case.yaml

@@ -50,7 +50,7 @@ jobs:
         go build -modfile extensions.mod --buildmode=plugin -o plugins/functions/Echo.so extensions/functions/echo/echo.go
         go build -modfile extensions.mod --buildmode=plugin -o plugins/functions/CountPlusOne@v1.0.0.so extensions/functions/countPlusOne/countPlusOne.go
         go build -modfile extensions.mod --buildmode=plugin -o plugins/functions/AccumulateWordCount@v1.0.0.so extensions/functions/accumulateWordCount/accumulateWordCount.go
-        go build --buildmode=plugin -o data/test/schemas/static/helloworld.so internal/converter/static/test/*.go
+        go build --buildmode=plugin -o data/test/helloworld.so internal/converter/protobuf/test/*.go
         mkdir -p plugins/portable/mirror
         cd sdk/go/example/mirror
         go build -o ../../../../plugins/portable/mirror/mirror .

+ 7 - 7
internal/converter/converter.go

@@ -17,28 +17,28 @@ package converter
 import (
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/converter/binary"
+	"github.com/lf-edge/ekuiper/internal/converter/custom"
 	"github.com/lf-edge/ekuiper/internal/converter/json"
-	"github.com/lf-edge/ekuiper/internal/converter/static"
 	"github.com/lf-edge/ekuiper/pkg/message"
 )
 
-type Instantiator func(t string, schemaFile string, schemaName string) (message.Converter, error)
+type Instantiator func(t string, schemaFileName string, SchemaMessageName string) (message.Converter, error)
 
 var ( // init once and read only
 	converters = map[string]Instantiator{
-		message.FormatJson: func(t string, schemaFile string, schemaName string) (message.Converter, error) {
+		message.FormatJson: func(t string, schemaFileName string, SchemaMessageName string) (message.Converter, error) {
 			return json.GetConverter()
 		},
-		message.FormatBinary: func(t string, schemaFile string, schemaName string) (message.Converter, error) {
+		message.FormatBinary: func(t string, schemaFileName string, SchemaMessageName string) (message.Converter, error) {
 			return binary.GetConverter()
 		},
-		message.FormatStatic: static.LoadConverter,
+		message.FormatCustom: custom.LoadConverter,
 	}
 )
 
-func GetOrCreateConverter(t string, schemaFile string, schemaName string) (message.Converter, error) {
+func GetOrCreateConverter(t string, schemaFileName string, SchemaMessageName string) (message.Converter, error) {
 	if c, ok := converters[t]; ok {
-		return c(t, schemaFile, schemaName)
+		return c(t, schemaFileName, SchemaMessageName)
 	}
 	return nil, fmt.Errorf("format type %s not supported", t)
 }

+ 41 - 0
internal/converter/custom/converter.go

@@ -0,0 +1,41 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package custom
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/converter/static"
+	"github.com/lf-edge/ekuiper/internal/pkg/def"
+	"github.com/lf-edge/ekuiper/internal/schema"
+	"github.com/lf-edge/ekuiper/pkg/message"
+)
+
+type Converter struct {
+}
+
+var converter = &Converter{}
+
+func LoadConverter(t string, schemaFile string, messageName string) (message.Converter, error) {
+	conf.Log.Infof("Load custom converter from file %s, for symbol Get%s", schemaFile, messageName)
+	ffs, err := schema.GetSchemaFile(def.SchemaType(t), schemaFile)
+	if err != nil {
+		return nil, err
+	}
+	if ffs.SoFile == "" {
+		return nil, fmt.Errorf("no so file found for custom schema %s", messageName)
+	}
+	return static.LoadStaticConverter(ffs.SoFile, messageName)
+}

+ 36 - 6
internal/converter/static/converter_test.go

@@ -12,22 +12,52 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package static
+package custom
 
 import (
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/schema"
 	"github.com/lf-edge/ekuiper/internal/testx"
+	"os"
+	"path/filepath"
 	"reflect"
 	"testing"
 )
 
-func init() {
+func TestCustomConverter(t *testing.T) {
+	dataDir, err := conf.GetDataLoc()
+	if err != nil {
+		t.Fatal(err)
+	}
+	etcDir := filepath.Join(dataDir, "schemas", "custom")
+	err = os.MkdirAll(etcDir, os.ModePerm)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() {
+		err = os.RemoveAll(etcDir)
+		if err != nil {
+			t.Fatal(err)
+		}
+	}()
+	// build the so file into data/test prior to running the test
+	//Copy the helloworld.so
+	bytesRead, err := os.ReadFile(filepath.Join(dataDir, "helloworld.so"))
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = os.WriteFile(filepath.Join(etcDir, "helloworld.so"), bytesRead, 0755)
+	if err != nil {
+		t.Fatal(err)
+	}
 	schema.InitRegistry()
+	testEncode(t)
+	testDecode(t)
 }
 
-func TestEncode(t *testing.T) {
-	c, err := LoadConverter("static", "helloworld", "HelloReply")
+func testEncode(t *testing.T) {
+	c, err := LoadConverter("custom", "helloworld", "HelloReply")
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -59,8 +89,8 @@ func TestEncode(t *testing.T) {
 	}
 }
 
-func TestDecode(t *testing.T) {
-	c, err := LoadConverter("static", "helloworld", "HelloRequest")
+func testDecode(t *testing.T) {
+	c, err := LoadConverter("custom", "helloworld", "HelloRequest")
 	if err != nil {
 		t.Fatal(err)
 	}

+ 3 - 3
internal/converter/ext_protobuf.go

@@ -25,11 +25,11 @@ import (
 )
 
 func init() {
-	converters[message.FormatProtobuf] = func(t string, schemaFile string, schemaName string) (message.Converter, error) {
-		fileName, err := schema.GetSchemaFile(def.SchemaType(t), schemaFile)
+	converters[message.FormatProtobuf] = func(t string, schemaFileName string, schemaMessageName string) (message.Converter, error) {
+		ffs, err := schema.GetSchemaFile(def.SchemaType(t), schemaFileName)
 		if err != nil {
 			return nil, err
 		}
-		return protobuf.NewConverter(fileName, schemaName)
+		return protobuf.NewConverter(ffs.SchemaFile, ffs.SoFile, schemaMessageName)
 	}
 }

+ 16 - 14
internal/converter/protobuf/converter.go

@@ -18,6 +18,8 @@ import (
 	"fmt"
 	"github.com/jhump/protoreflect/desc"
 	"github.com/jhump/protoreflect/desc/protoparse"
+	"github.com/lf-edge/ekuiper/internal/converter/static"
+	"github.com/lf-edge/ekuiper/pkg/message"
 )
 
 type Converter struct {
@@ -28,25 +30,25 @@ type Converter struct {
 var protoParser *protoparse.Parser
 
 func init() {
-	//etcDir, err := conf.GetConfLoc()
-	//if err != nil {
-	//	panic(err)
-	//}
 	protoParser = &protoparse.Parser{}
 }
 
-func NewConverter(fileName string, messageName string) (*Converter, error) {
-	if fds, err := protoParser.ParseFiles(fileName); err != nil {
-		return nil, fmt.Errorf("parse schema file %s failed: %s", fileName, err)
+func NewConverter(schemaFile string, soFile string, messageName string) (message.Converter, error) {
+	if soFile != "" {
+		return static.LoadStaticConverter(soFile, messageName)
 	} else {
-		messageDescriptor := fds[0].FindMessage(messageName)
-		if messageDescriptor == nil {
-			return nil, fmt.Errorf("message type %s not found in schema file %s", messageName, fileName)
+		if fds, err := protoParser.ParseFiles(schemaFile); err != nil {
+			return nil, fmt.Errorf("parse schema file %s failed: %s", schemaFile, err)
+		} else {
+			messageDescriptor := fds[0].FindMessage(messageName)
+			if messageDescriptor == nil {
+				return nil, fmt.Errorf("message type %s not found in schema file %s", messageName, schemaFile)
+			}
+			return &Converter{
+				descriptor: messageDescriptor,
+				fc:         GetFieldConverter(),
+			}, nil
 		}
-		return &Converter{
-			descriptor: messageDescriptor,
-			fc:         GetFieldConverter(),
-		}, nil
 	}
 }
 

+ 71 - 2
internal/converter/protobuf/converter_test.go

@@ -16,13 +16,17 @@ package protobuf
 
 import (
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/schema"
 	"github.com/lf-edge/ekuiper/internal/testx"
+	"os"
+	"path/filepath"
 	"reflect"
 	"testing"
 )
 
 func TestEncode(t *testing.T) {
-	c, err := NewConverter("../../schema/test/test1.proto", "Person")
+	c, err := NewConverter("../../schema/test/test1.proto", "", "Person")
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -59,7 +63,7 @@ func TestEncode(t *testing.T) {
 }
 
 func TestDecode(t *testing.T) {
-	c, err := NewConverter("../../schema/test/test1.proto", "Person")
+	c, err := NewConverter("../../schema/test/test1.proto", "", "Person")
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -88,3 +92,68 @@ func TestDecode(t *testing.T) {
 		}
 	}
 }
+
+func TestStatic(t *testing.T) {
+	dataDir, err := conf.GetDataLoc()
+	if err != nil {
+		t.Fatal(err)
+	}
+	etcDir := filepath.Join(dataDir, "schemas", "custom")
+	err = os.MkdirAll(etcDir, os.ModePerm)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() {
+		err = os.RemoveAll(etcDir)
+		if err != nil {
+			t.Fatal(err)
+		}
+	}()
+	// build the so file into data/test prior to running the test
+	//Copy the helloworld.so
+	bytesRead, err := os.ReadFile(filepath.Join(dataDir, "helloworld.so"))
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = os.WriteFile(filepath.Join(etcDir, "helloworld.so"), bytesRead, 0755)
+	if err != nil {
+		t.Fatal(err)
+	}
+	schema.InitRegistry()
+	c, err := NewConverter("../../schema/test/test1.proto", "../../../data/test/schemas/custom/helloworld.so", "HelloReply")
+	if err != nil {
+		t.Fatal(err)
+	}
+	tests := []struct {
+		m map[string]interface{}
+		r []byte
+		e string
+	}{
+		{
+			m: map[string]interface{}{
+				"message": "test",
+			},
+			r: []byte{0x0a, 0x04, 0x74, 0x65, 0x73, 0x74},
+		}, {
+			m: map[string]interface{}{
+				"message": "another test 2",
+			},
+			r: []byte{0x0a, 0x0e, 0x61, 0x6e, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x20, 0x74, 0x65, 0x73, 0x74, 0x20, 0x32},
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	for i, tt := range tests {
+		a, err := c.Encode(tt.m)
+		if !reflect.DeepEqual(tt.e, testx.Errstring(err)) {
+			t.Errorf("%d.error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.e, err)
+		} else if tt.e == "" && !reflect.DeepEqual(tt.r, a) {
+			t.Errorf("%d. \n\nresult mismatch:\n\nexp=%x\n\ngot=%x\n\n", i, tt.r, a)
+		}
+		m, err := c.Decode(tt.r)
+		if !reflect.DeepEqual(tt.e, testx.Errstring(err)) {
+			t.Errorf("%d.error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.e, err)
+		} else if tt.e == "" && !reflect.DeepEqual(tt.m, m) {
+			t.Errorf("%d. \n\nresult mismatch:\n\nexp=%v\n\ngot=%v\n\n", i, tt.m, m)
+		}
+	}
+}

internal/converter/static/test/helloworld.pb.go → internal/converter/protobuf/test/helloworld.pb.go


internal/converter/static/test/helloworld.proto → internal/converter/protobuf/test/helloworld.proto


internal/converter/static/test/helloworld_wrapper.go → internal/converter/protobuf/test/helloworld_wrapper.go


internal/converter/static/test/wrapper_test.go → internal/converter/protobuf/test/wrapper_test.go


+ 9 - 21
internal/converter/static/converter.go

@@ -17,42 +17,30 @@ package static
 import (
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/pkg/def"
-	"github.com/lf-edge/ekuiper/internal/schema"
 	"github.com/lf-edge/ekuiper/pkg/message"
 	"plugin"
 )
 
-type Converter struct {
-}
-
-var converter = &Converter{}
-
-func LoadConverter(t string, schemaFile string, schemaId string) (message.Converter, error) {
-	conf.Log.Infof("Load static converter from file %s, for symbol Get%s", schemaFile, schemaId)
-	fileName, err := schema.GetSchemaFile(def.SchemaType(t), schemaFile)
-	if err != nil {
-		return nil, err
-	}
-	sp, err := plugin.Open(fileName)
+func LoadStaticConverter(soFile string, messageName string) (message.Converter, error) {
+	sp, err := plugin.Open(soFile)
 	if err != nil {
-		conf.Log.Errorf(fmt.Sprintf("static schema file %s open error: %v", fileName, err))
-		return nil, fmt.Errorf("cannot open %s: %v", fileName, err)
+		conf.Log.Errorf(fmt.Sprintf("custom schema file %s open error: %v", soFile, err))
+		return nil, fmt.Errorf("cannot open %s: %v", soFile, err)
 	}
-	nf, err := sp.Lookup("Get" + schemaId)
+	nf, err := sp.Lookup("Get" + messageName)
 	if err != nil {
-		conf.Log.Warnf(fmt.Sprintf("cannot find schemaId %s, please check if it is exported: Get%v", schemaId, err))
+		conf.Log.Warnf(fmt.Sprintf("cannot find schemaId %s, please check if it is exported: Get%v", messageName, err))
 		return nil, nil
 	}
 	nff, ok := nf.(func() interface{})
 	if !ok {
-		conf.Log.Errorf("exported symbol Get%s is not func to return interface{}", schemaId)
-		return nil, fmt.Errorf("load static converter %s.%s error", schemaFile, schemaId)
+		conf.Log.Errorf("exported symbol Get%s is not func to return interface{}", messageName)
+		return nil, fmt.Errorf("load static converter %s, message %s error", soFile, messageName)
 	}
 	mc, ok := nff().(message.Converter)
 	if ok {
 		return mc, nil
 	} else {
-		return nil, fmt.Errorf("get schema converter failed, exported symbol %s is not type of message.Converter", schemaId)
+		return nil, fmt.Errorf("get schema converter failed, exported symbol %s is not type of message.Converter", messageName)
 	}
 }

+ 2 - 2
internal/pkg/def/schema.go

@@ -18,10 +18,10 @@ type SchemaType string
 
 const (
 	PROTOBUF SchemaType = "protobuf"
-	STATIC   SchemaType = "static"
+	CUSTOM   SchemaType = "custom"
 )
 
 var SchemaTypes = []SchemaType{
 	PROTOBUF,
-	STATIC,
+	CUSTOM,
 }

+ 83 - 37
internal/schema/registry.go

@@ -29,13 +29,18 @@ import (
 // Initialize in the server startup
 var registry *Registry
 
+type Files struct {
+	SchemaFile string
+	SoFile     string
+}
+
 // Registry is a global registry for schemas
 // It stores the schema ids and the ref to its file content in memory
 // The schema definition is stored in the file system and will only be loaded once used
 type Registry struct {
 	sync.RWMutex
 	// The map of schema files for all types
-	schemas map[def.SchemaType]map[string]string
+	schemas map[def.SchemaType]map[string]*Files
 }
 
 // Registry provide the method to add, update, get and parse and delete schemas
@@ -43,7 +48,7 @@ type Registry struct {
 // InitRegistry initialize the registry, only called once by the server
 func InitRegistry() error {
 	registry = &Registry{
-		schemas: make(map[def.SchemaType]map[string]string, len(def.SchemaTypes)),
+		schemas: make(map[def.SchemaType]map[string]*Files, len(def.SchemaTypes)),
 	}
 	dataDir, err := conf.GetDataLoc()
 	if err != nil {
@@ -51,18 +56,29 @@ func InitRegistry() error {
 	}
 	for _, schemaType := range def.SchemaTypes {
 		schemaDir := filepath.Join(dataDir, "schemas", string(schemaType))
-		var newSchemas map[string]string
+		var newSchemas map[string]*Files
 		files, err := os.ReadDir(schemaDir)
 		if err != nil {
 			conf.Log.Warnf("cannot read schema directory: %s", err)
-			newSchemas = make(map[string]string)
+			newSchemas = make(map[string]*Files)
 		} else {
-			newSchemas = make(map[string]string, len(files))
+			newSchemas = make(map[string]*Files, len(files))
 			for _, file := range files {
 				fileName := filepath.Base(file.Name())
+				ext := filepath.Ext(fileName)
 				schemaId := strings.TrimSuffix(fileName, filepath.Ext(fileName))
-				newSchemas[schemaId] = filepath.Join(schemaDir, file.Name())
-				conf.Log.Infof("schema %s.%s loaded", schemaType, schemaId)
+				ffs, ok := newSchemas[schemaId]
+				if !ok {
+					ffs = &Files{}
+					newSchemas[schemaId] = ffs
+				}
+				switch ext {
+				case ".so":
+					ffs.SoFile = filepath.Join(schemaDir, file.Name())
+				default:
+					ffs.SchemaFile = filepath.Join(schemaDir, file.Name())
+				}
+				conf.Log.Infof("schema file %s.%s loaded", schemaType, schemaId)
 			}
 		}
 		registry.schemas[schemaType] = newSchemas
@@ -102,27 +118,40 @@ func CreateOrUpdateSchema(info *Info) error {
 	if err := os.MkdirAll(etcDir, os.ModePerm); err != nil {
 		return err
 	}
-	schemaFile := filepath.Join(etcDir, info.Name+schemaExt[info.Type])
-	if _, err := os.Stat(schemaFile); os.IsNotExist(err) {
-		file, err := os.Create(schemaFile)
-		if err != nil {
-			return err
+	ffs := &Files{}
+	if info.Content != "" || info.FilePath != "" {
+		schemaFile := filepath.Join(etcDir, info.Name+schemaExt[info.Type])
+		if _, err := os.Stat(schemaFile); os.IsNotExist(err) {
+			file, err := os.Create(schemaFile)
+			if err != nil {
+				return err
+			}
+			defer file.Close()
 		}
-		defer file.Close()
-	}
-	if info.Content != "" {
-		err := os.WriteFile(schemaFile, []byte(info.Content), 0666)
-		if err != nil {
-			return err
+		if info.Content != "" {
+			err := os.WriteFile(schemaFile, []byte(info.Content), 0666)
+			if err != nil {
+				return err
+			}
+		} else {
+			err := httpx.DownloadFile(schemaFile, info.FilePath)
+			if err != nil {
+				return err
+			}
 		}
-	} else {
-		err := httpx.DownloadFile(schemaFile, info.FilePath)
+		ffs.SchemaFile = schemaFile
+	}
+
+	if info.SoPath != "" {
+		soFile := filepath.Join(etcDir, info.Name+".so")
+		err := httpx.DownloadFile(soFile, info.SoPath)
 		if err != nil {
 			return err
 		}
+		ffs.SoFile = soFile
 	}
 
-	registry.schemas[info.Type][info.Name] = schemaFile
+	registry.schemas[info.Type][info.Name] = ffs
 	return nil
 }
 
@@ -131,26 +160,35 @@ func GetSchema(schemaType def.SchemaType, name string) (*Info, error) {
 	if err != nil {
 		return nil, err
 	}
-	content, err := os.ReadFile(schemaFile)
-	if err != nil {
-		return nil, fmt.Errorf("cannot read schema file %s: %s", schemaFile, err)
-	}
-	return &Info{
-		Type:     schemaType,
-		Name:     name,
-		Content:  string(content),
-		FilePath: schemaFile,
-	}, nil
+	if schemaFile.SchemaFile != "" {
+		content, err := os.ReadFile(schemaFile.SchemaFile)
+		if err != nil {
+			return nil, fmt.Errorf("cannot read schema file %s: %s", schemaFile, err)
+		}
+		return &Info{
+			Type:     schemaType,
+			Name:     name,
+			Content:  string(content),
+			FilePath: schemaFile.SchemaFile,
+		}, nil
+	} else {
+		return &Info{
+			Type:   schemaType,
+			Name:   name,
+			SoPath: schemaFile.SoFile,
+		}, nil
+	}
+
 }
 
-func GetSchemaFile(schemaType def.SchemaType, name string) (string, error) {
+func GetSchemaFile(schemaType def.SchemaType, name string) (*Files, error) {
 	registry.RLock()
 	defer registry.RUnlock()
 	if _, ok := registry.schemas[schemaType]; !ok {
-		return "", fmt.Errorf("schema type %s not found", schemaType)
+		return nil, fmt.Errorf("schema type %s not found", schemaType)
 	}
 	if _, ok := registry.schemas[schemaType][name]; !ok {
-		return "", fmt.Errorf("schema type %s, file %s not found", schemaType, name)
+		return nil, fmt.Errorf("schema type %s, file %s not found", schemaType, name)
 	}
 	schemaFile := registry.schemas[schemaType][name]
 	return schemaFile, nil
@@ -166,9 +204,17 @@ func DeleteSchema(schemaType def.SchemaType, name string) error {
 		return fmt.Errorf("schema %s.%s not found", schemaType, name)
 	}
 	schemaFile := registry.schemas[schemaType][name]
-	err := os.Remove(schemaFile)
-	if err != nil {
-		conf.Log.Errorf("cannot delete schema file %s: %s", schemaFile, err)
+	if schemaFile.SchemaFile != "" {
+		err := os.Remove(schemaFile.SchemaFile)
+		if err != nil {
+			conf.Log.Errorf("cannot delete schema file %s: %s", schemaFile.SchemaFile, err)
+		}
+	}
+	if schemaFile.SoFile != "" {
+		err := os.Remove(schemaFile.SoFile)
+		if err != nil {
+			conf.Log.Errorf("cannot delete schema so file %s: %s", schemaFile.SoFile, err)
+		}
 	}
 	delete(registry.schemas[schemaType], name)
 	return nil

+ 119 - 5
internal/schema/registry_test.go

@@ -26,7 +26,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/testx"
 )
 
-func TestRegistry(t *testing.T) {
+func TestProtoRegistry(t *testing.T) {
 	testx.InitEnv()
 	// Move test schema file to etc dir
 	etcDir, err := conf.GetDataLoc()
@@ -102,6 +102,7 @@ func TestRegistry(t *testing.T) {
 		Name:     "test2",
 		Type:     "protobuf",
 		FilePath: endpoint + "/test2.proto",
+		SoPath:   endpoint + "/fake.so",
 	}
 	err = CreateOrUpdateSchema(updatedSchema2)
 	if err != nil {
@@ -117,7 +118,10 @@ func TestRegistry(t *testing.T) {
 		t.Errorf("Expect\n%v\nbut got\n%v", expectedSchemas, regSchemas)
 		return
 	}
-	checkFile(etcDir, expectedSchemas, t)
+	expectedFiles := []string{
+		"init.proto", "test1.proto", "test2.proto", "test2.so",
+	}
+	checkFile(etcDir, expectedFiles, t)
 	// Delete 2
 	err = DeleteSchema("protobuf", "test2")
 	if err != nil {
@@ -144,7 +148,10 @@ func TestRegistry(t *testing.T) {
 		t.Errorf("Expect\n%v\nbut got\n%v", expectedSchemas, regSchemas)
 		return
 	}
-	checkFile(etcDir, expectedSchemas, t)
+	expectedFiles = []string{
+		"init.proto", "test1.proto",
+	}
+	checkFile(etcDir, expectedFiles, t)
 	// Delete 1
 	err = DeleteSchema("protobuf", "test1")
 	if err != nil {
@@ -160,7 +167,114 @@ func TestRegistry(t *testing.T) {
 		t.Errorf("Expect\n%v\nbut got\n%v", expectedSchemas, regSchemas)
 		return
 	}
-	checkFile(etcDir, expectedSchemas, t)
+	expectedFiles = []string{
+		"init.proto",
+	}
+	checkFile(etcDir, expectedFiles, t)
+}
+
+func TestCustomRegistry(t *testing.T) {
+	testx.InitEnv()
+	// Move test schema file to etc dir
+	etcDir, err := conf.GetDataLoc()
+	if err != nil {
+		t.Fatal(err)
+	}
+	etcDir = filepath.Join(etcDir, "schemas", "custom")
+	err = os.MkdirAll(etcDir, os.ModePerm)
+	if err != nil {
+		t.Fatal(err)
+	}
+	//Copy fake.so as init
+	bytesRead, err := os.ReadFile("test/fake.so")
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = os.WriteFile(filepath.Join(etcDir, "init.so"), bytesRead, 0755)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() {
+		err = os.RemoveAll(etcDir)
+		if err != nil {
+			t.Fatal(err)
+		}
+	}()
+	err = InitRegistry()
+	if err != nil {
+		t.Errorf("InitRegistry error: %v", err)
+		return
+	}
+	s := httptest.NewServer(
+		http.FileServer(http.Dir("test")),
+	)
+	defer s.Close()
+	endpoint := s.URL
+	// Create 1 by file
+	schema1 := &Info{
+		Name:   "test1",
+		Type:   "custom",
+		SoPath: endpoint + "/fake.so",
+	}
+	err = Register(schema1)
+	if err != nil {
+		t.Errorf("Register schema1 error: %v", err)
+		return
+	}
+	// Get 1
+	expectedSchema := &Info{
+		Type:   "custom",
+		Name:   "test1",
+		SoPath: filepath.Join(etcDir, "test1.so"),
+	}
+	gottenSchema, err := GetSchema("custom", "test1")
+	if !reflect.DeepEqual(gottenSchema, expectedSchema) {
+		t.Errorf("Get test1 unmatch: Expect\n%v\nbut got\n%v", *expectedSchema, *gottenSchema)
+		return
+	}
+	// Update 1 by file
+	updatedSchema2 := &Info{
+		Name:   "test1",
+		Type:   "custom",
+		SoPath: endpoint + "/fake.so",
+	}
+	err = CreateOrUpdateSchema(updatedSchema2)
+	if err != nil {
+		t.Errorf("Update Schema2 error: %v", err)
+		return
+	}
+	// List & check file
+	regSchemas, err := GetAllForType("custom")
+	expectedSchemas := []string{
+		"init", "test1",
+	}
+	if !reflect.DeepEqual(len(regSchemas), len(expectedSchemas)) {
+		t.Errorf("Expect\n%v\nbut got\n%v", expectedSchemas, regSchemas)
+		return
+	}
+	expectedFiles := []string{
+		"init.so", "test1.so",
+	}
+	checkFile(etcDir, expectedFiles, t)
+	// Delete 2
+	err = DeleteSchema("custom", "init")
+	if err != nil {
+		t.Errorf("Delete Schema2 error: %v", err)
+		return
+	}
+	// List & check file
+	regSchemas, err = GetAllForType("custom")
+	expectedSchemas = []string{
+		"test1",
+	}
+	if !reflect.DeepEqual(len(regSchemas), len(expectedSchemas)) {
+		t.Errorf("Expect\n%v\nbut got\n%v", expectedSchemas, regSchemas)
+		return
+	}
+	expectedFiles = []string{
+		"test1.so",
+	}
+	checkFile(etcDir, expectedFiles, t)
 }
 
 func checkFile(etcDir string, schemas []string, t *testing.T) {
@@ -176,7 +290,7 @@ func checkFile(etcDir string, schemas []string, t *testing.T) {
 		fileName := filepath.Base(file.Name())
 		found := false
 		for _, schema := range schemas {
-			if fileName == schema+".proto" {
+			if fileName == schema {
 				found = true
 				break
 			}

+ 24 - 0
internal/schema/schema.go

@@ -15,6 +15,7 @@
 package schema
 
 import (
+	"fmt"
 	"github.com/lf-edge/ekuiper/internal/pkg/def"
 )
 
@@ -23,6 +24,29 @@ type Info struct {
 	Name     string         `json:"name"`
 	Content  string         `json:"content"`
 	FilePath string         `json:"file"`
+	SoPath   string         `json:"soFile"`
+}
+
+func (i *Info) Validate() error {
+	if i.Name == "" {
+		return fmt.Errorf("name is required")
+	}
+	if i.Content != "" && i.FilePath != "" {
+		return fmt.Errorf("cannot specify both content and file")
+	}
+	switch i.Type {
+	case def.PROTOBUF:
+		if i.Content == "" && i.FilePath == "" {
+			return fmt.Errorf("must specify content or file")
+		}
+	case def.CUSTOM:
+		if i.SoPath == "" {
+			return fmt.Errorf("soFile is required")
+		}
+	default:
+		return fmt.Errorf("unsupported type: %s", i.Type)
+	}
+	return nil
 }
 
 var (

+ 99 - 0
internal/schema/schema_test.go

@@ -0,0 +1,99 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package schema
+
+import (
+	"errors"
+	"fmt"
+	"reflect"
+	"testing"
+)
+
+func TestSchemaInfo(t *testing.T) {
+	var tests = []struct {
+		i   *Info
+		err error
+	}{
+		{
+			i: &Info{
+				Type:    "static",
+				Name:    "aa",
+				Content: "bb",
+				SoPath:  "dd",
+			},
+			err: errors.New("unsupported type: static"),
+		}, {
+			i: &Info{
+				Type:     "static",
+				Name:     "aa",
+				Content:  "bb",
+				FilePath: "cc",
+				SoPath:   "dd",
+			},
+			err: errors.New("cannot specify both content and file"),
+		},
+		{
+			i: &Info{
+				Type:     "protobuf",
+				FilePath: "cc",
+				SoPath:   "dd",
+			},
+			err: errors.New("name is required"),
+		}, {
+			i: &Info{
+				Type:   "protobuf",
+				Name:   "aa",
+				SoPath: "dd",
+			},
+			err: errors.New("must specify content or file"),
+		}, {
+			i: &Info{
+				Type:    "protobuf",
+				Name:    "aa",
+				Content: "bb",
+				SoPath:  "dd",
+			},
+			err: nil,
+		}, {
+			i: &Info{
+				Type:    "protobuf",
+				Name:    "aa",
+				Content: "bb",
+			},
+			err: nil,
+		}, {
+			i: &Info{
+				Type:   "custom",
+				Name:   "aa",
+				SoPath: "bb",
+			},
+			err: nil,
+		}, {
+			i: &Info{
+				Type:    "custom",
+				Name:    "aa",
+				Content: "bb",
+			},
+			err: errors.New("soFile is required"),
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	for i, tt := range tests {
+		err := tt.i.Validate()
+		if !reflect.DeepEqual(err, tt.err) {
+			t.Errorf("%d failed,\n expect: %v, \nbut got: %v", i, tt.err, err)
+		}
+	}
+}

+ 0 - 0
internal/schema/test/fake.so


+ 4 - 4
internal/server/schema_init.go

@@ -64,8 +64,8 @@ func schemasHandler(w http.ResponseWriter, r *http.Request) {
 			handleError(w, err, "Invalid body: Error decoding schema json", logger)
 			return
 		}
-		if sch.Content != "" && sch.FilePath != "" {
-			handleError(w, nil, "Invalid body: Cannot specify both content and file", logger)
+		if err = sch.Validate(); err != nil {
+			handleError(w, nil, "Invalid body", logger)
 			return
 		}
 		err = schema.Register(sch)
@@ -114,8 +114,8 @@ func schemaHandler(w http.ResponseWriter, r *http.Request) {
 			handleError(w, nil, "Invalid body: Type or name does not match", logger)
 			return
 		}
-		if sch.Content != "" && sch.FilePath != "" {
-			handleError(w, nil, "Invalid body: Cannot specify both content and file", logger)
+		if err = sch.Validate(); err != nil {
+			handleError(w, nil, "Invalid body", logger)
 			return
 		}
 		err = schema.CreateOrUpdateSchema(sch)

+ 1 - 1
internal/topo/node/sink_node.go

@@ -281,7 +281,7 @@ func (m *SinkNode) parseConf(logger api.Logger) (*SinkConf, error) {
 	m.concurrency = sconf.Concurrency
 	if sconf.Format == "" {
 		sconf.Format = "json"
-	} else if sconf.Format != message.FormatJson && sconf.Format != message.FormatProtobuf && sconf.Format != message.FormatBinary && sconf.Format != message.FormatStatic {
+	} else if sconf.Format != message.FormatJson && sconf.Format != message.FormatProtobuf && sconf.Format != message.FormatBinary && sconf.Format != message.FormatCustom {
 		logger.Warnf("invalid type for format property, should be json protobuf or binary but found %s", sconf.Format)
 		sconf.Format = "json"
 	}

+ 2 - 2
internal/topo/transform/template.go

@@ -34,7 +34,7 @@ func GenTransform(dt string, format string, schemaId string) (TransFunc, error)
 		err error
 	)
 	switch format {
-	case message.FormatProtobuf, message.FormatStatic:
+	case message.FormatProtobuf, message.FormatCustom:
 		r := strings.Split(schemaId, ".")
 		if len(r) != 2 {
 			return nil, fmt.Errorf("invalid schemaId: %s", schemaId)
@@ -73,7 +73,7 @@ func GenTransform(dt string, format string, schemaId string) (TransFunc, error)
 			}
 			j, err := json.Marshal(d)
 			return j, false, err
-		case message.FormatProtobuf, message.FormatStatic:
+		case message.FormatProtobuf, message.FormatCustom:
 			if transformed {
 				m := make(map[string]interface{})
 				err := json.Unmarshal(bs, &m)

+ 2 - 2
pkg/message/artifacts.go

@@ -18,7 +18,7 @@ const (
 	FormatBinary   = "binary"
 	FormatJson     = "json"
 	FormatProtobuf = "protobuf"
-	FormatStatic   = "static"
+	FormatCustom   = "custom"
 
 	DefaultField = "self"
 	MetaKey      = "__meta"
@@ -26,7 +26,7 @@ const (
 
 func IsFormatSupported(format string) bool {
 	switch format {
-	case FormatBinary, FormatJson, FormatProtobuf, FormatStatic:
+	case FormatBinary, FormatJson, FormatProtobuf, FormatCustom:
 		return true
 	default:
 		return false