Переглянути джерело

refactor: move all protobuf support to schema feature tag

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 роки тому
батько
коміт
519996e8fb

+ 1 - 1
internal/schema/binary/converter.go

@@ -24,7 +24,7 @@ type Converter struct {
 
 var converter = &Converter{}
 
-func GetConverter() (*Converter, error) {
+func GetConverter() (message.Converter, error) {
 	return converter, nil
 }
 

internal/schema/binary/converter_test.go → internal/converter/binary/converter_test.go


+ 42 - 0
internal/converter/converter.go

@@ -0,0 +1,42 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package converter
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/converter/binary"
+	"github.com/lf-edge/ekuiper/internal/converter/json"
+	"github.com/lf-edge/ekuiper/pkg/message"
+)
+
+type Instantiator func(t string, schemaFile string, schemaId string) (message.Converter, error)
+
+var ( // init once and read only
+	converters = map[string]Instantiator{
+		message.FormatJson: func(t string, schemaFile string, schemaId string) (message.Converter, error) {
+			return json.GetConverter()
+		},
+		message.FormatBinary: func(t string, schemaFile string, schemaId string) (message.Converter, error) {
+			return binary.GetConverter()
+		},
+	}
+)
+
+func GetOrCreateConverter(t string, schemaFile string, schemaId string) (message.Converter, error) {
+	if c, ok := converters[t]; ok {
+		return c(t, schemaFile, schemaId)
+	}
+	return nil, fmt.Errorf("format type %s not supported", t)
+}

+ 35 - 0
internal/converter/ext_protobuf.go

@@ -0,0 +1,35 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build schema || !core
+// +build schema !core
+
+package converter
+
+import (
+	"github.com/lf-edge/ekuiper/internal/converter/protobuf"
+	"github.com/lf-edge/ekuiper/internal/pkg/def"
+	"github.com/lf-edge/ekuiper/internal/schema"
+	"github.com/lf-edge/ekuiper/pkg/message"
+)
+
+func init() {
+	converters[message.FormatProtobuf] = func(t string, schemaFile string, schemaId string) (message.Converter, error) {
+		fileName, err := schema.GetSchemaFile(def.SchemaType(t), schemaFile)
+		if err != nil {
+			return nil, err
+		}
+		return protobuf.NewConverter(schemaId, fileName)
+	}
+}

+ 2 - 1
internal/schema/json/converter.go

@@ -16,6 +16,7 @@ package json
 
 import (
 	"encoding/json"
+	"github.com/lf-edge/ekuiper/pkg/message"
 )
 
 type Converter struct {
@@ -23,7 +24,7 @@ type Converter struct {
 
 var converter = &Converter{}
 
-func GetConverter() (*Converter, error) {
+func GetConverter() (message.Converter, error) {
 	return converter, nil
 }
 

internal/schema/json/converter_test.go → internal/converter/json/converter_test.go


internal/schema/protobuf/converter.go → internal/converter/protobuf/converter.go


+ 2 - 2
internal/schema/protobuf/converter_test.go

@@ -22,7 +22,7 @@ import (
 )
 
 func TestEncode(t *testing.T) {
-	c, err := NewConverter("test1.Person", "../test/test1.proto")
+	c, err := NewConverter("test1.Person", "../../schema/test/test1.proto")
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -59,7 +59,7 @@ func TestEncode(t *testing.T) {
 }
 
 func TestDecode(t *testing.T) {
-	c, err := NewConverter("test1.Person", "../test/test1.proto")
+	c, err := NewConverter("test1.Person", "../../schema/test/test1.proto")
 	if err != nil {
 		t.Fatal(err)
 	}

internal/schema/protobuf/fieldConverterSingleton.go → internal/converter/protobuf/fieldConverterSingleton.go


+ 2 - 2
internal/schema/registry.go

@@ -126,7 +126,7 @@ func CreateOrUpdateSchema(info *Info) error {
 }
 
 func GetSchema(schemaType def.SchemaType, name string) (*Info, error) {
-	schemaFile, err := getSchemaFile(schemaType, name)
+	schemaFile, err := GetSchemaFile(schemaType, name)
 	if err != nil {
 		return nil, err
 	}
@@ -142,7 +142,7 @@ func GetSchema(schemaType def.SchemaType, name string) (*Info, error) {
 	}, nil
 }
 
-func getSchemaFile(schemaType def.SchemaType, name string) (string, error) {
+func GetSchemaFile(schemaType def.SchemaType, name string) (string, error) {
 	registry.RLock()
 	defer registry.RUnlock()
 	if _, ok := registry.schemas[schemaType]; !ok {

+ 0 - 22
internal/schema/schema.go

@@ -15,12 +15,7 @@
 package schema
 
 import (
-	"fmt"
 	"github.com/lf-edge/ekuiper/internal/pkg/def"
-	"github.com/lf-edge/ekuiper/internal/schema/binary"
-	"github.com/lf-edge/ekuiper/internal/schema/json"
-	"github.com/lf-edge/ekuiper/internal/schema/protobuf"
-	"github.com/lf-edge/ekuiper/pkg/message"
 )
 
 type Info struct {
@@ -35,20 +30,3 @@ var (
 		def.PROTOBUF: ".proto",
 	}
 )
-
-func GetOrCreateConverter(t string, schemaFile string, schemaId string) (message.Converter, error) {
-	switch t {
-	case message.FormatJson:
-		return json.GetConverter()
-	case message.FormatBinary:
-		return binary.GetConverter()
-	case message.FormatProtobuf:
-		fileName, err := getSchemaFile(def.SchemaType(t), schemaFile)
-		if err != nil {
-			return nil, err
-		}
-		return protobuf.NewConverter(schemaId, fileName)
-	default:
-		return nil, fmt.Errorf("unsupported schema type: %s", t)
-	}
-}

+ 1 - 1
internal/service/schema.go

@@ -21,7 +21,7 @@ import (
 	"github.com/jhump/protoreflect/desc/protoparse"
 	"github.com/jhump/protoreflect/dynamic"
 	kconf "github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/schema/protobuf"
+	"github.com/lf-edge/ekuiper/internal/converter/protobuf"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	_ "google.golang.org/genproto/googleapis/api/annotations"

+ 3 - 3
internal/topo/node/source_node.go

@@ -17,7 +17,7 @@ package node
 import (
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/schema"
+	"github.com/lf-edge/ekuiper/internal/converter"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
@@ -103,9 +103,9 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 				}
 				schemaFile = r[0]
 			}
-			converter, err := schema.GetOrCreateConverter(format, schemaFile, schemaId)
+			converter, err := converter.GetOrCreateConverter(format, schemaFile, schemaId)
 			if err != nil {
-				msg := fmt.Sprintf("cannot get converter from format %s, schemaId %s", format, schemaId)
+				msg := fmt.Sprintf("cannot get converter from format %s, schemaId %s: %v", format, schemaId, err)
 				logger.Warnf(msg)
 				return fmt.Errorf(msg)
 			}

+ 2 - 2
internal/topo/operator/preprocessor_test.go

@@ -20,7 +20,7 @@ import (
 	"errors"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/schema"
+	"github.com/lf-edge/ekuiper/internal/converter"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/ast"
@@ -1114,7 +1114,7 @@ func TestPreprocessorForBinary(t *testing.T) {
 		if tt.isBinary {
 			format = message.FormatBinary
 		}
-		converter, _ := schema.GetOrCreateConverter(format, "", "")
+		converter, _ := converter.GetOrCreateConverter(format, "", "")
 		nCtx := context.WithValue(ctx, context.DecodeKey, converter)
 		if dm, e := nCtx.Decode(tt.data); e != nil {
 			log.Fatal(e)

+ 2 - 2
internal/topo/transform/template.go

@@ -19,7 +19,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/schema"
+	"github.com/lf-edge/ekuiper/internal/converter"
 	"github.com/lf-edge/ekuiper/pkg/message"
 	"strings"
 	"text/template"
@@ -39,7 +39,7 @@ func GenTransform(dt string, format string, schemaId string) (TransFunc, error)
 		if len(r) != 2 {
 			return nil, fmt.Errorf("invalid schemaId: %s", schemaId)
 		}
-		c, err = schema.GetOrCreateConverter(message.FormatProtobuf, r[0], schemaId)
+		c, err = converter.GetOrCreateConverter(message.FormatProtobuf, r[0], schemaId)
 		if err != nil {
 			return nil, err
 		}