Browse Source

feat(schema): schema infer from format mechanism

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 years ago
parent
commit
85cea5e93f

+ 1 - 0
.github/workflows/run_test_case.yaml

@@ -51,6 +51,7 @@ jobs:
         go build -modfile extensions.mod --buildmode=plugin -o plugins/functions/CountPlusOne@v1.0.0.so extensions/functions/countPlusOne/countPlusOne.go
         go build -modfile extensions.mod --buildmode=plugin -o plugins/functions/AccumulateWordCount@v1.0.0.so extensions/functions/accumulateWordCount/accumulateWordCount.go
         go build --buildmode=plugin -o data/test/helloworld.so internal/converter/protobuf/test/*.go
+        go build --buildmode=plugin -o data/test/myFormat.so internal/converter/custom/test/*.go
         mkdir -p plugins/portable/mirror
         cd sdk/go/example/mirror
         go build -o ../../../../plugins/portable/mirror/mirror .

+ 21 - 10
internal/converter/custom/converter_test.go

@@ -43,11 +43,11 @@ func TestCustomConverter(t *testing.T) {
 	}()
 	// build the so file into data/test prior to running the test
 	//Copy the helloworld.so
-	bytesRead, err := os.ReadFile(filepath.Join(dataDir, "helloworld.so"))
+	bytesRead, err := os.ReadFile(filepath.Join(dataDir, "myFormat.so"))
 	if err != nil {
 		t.Fatal(err)
 	}
-	err = os.WriteFile(filepath.Join(etcDir, "helloworld.so"), bytesRead, 0755)
+	err = os.WriteFile(filepath.Join(etcDir, "myFormat.so"), bytesRead, 0755)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -57,7 +57,7 @@ func TestCustomConverter(t *testing.T) {
 }
 
 func testEncode(t *testing.T) {
-	c, err := LoadConverter("custom", "helloworld", "HelloReply")
+	c, err := LoadConverter("custom", "myFormat", "Sample")
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -68,14 +68,25 @@ func testEncode(t *testing.T) {
 	}{
 		{
 			m: map[string]interface{}{
-				"message": "test",
+				"id":   12,
+				"name": "test",
 			},
-			r: []byte{0x0a, 0x04, 0x74, 0x65, 0x73, 0x74},
+			r: []byte(`{"id":12,"name":"test"}`),
 		}, {
 			m: map[string]interface{}{
-				"message": "another test 2",
+				"id":   7,
+				"name": "John Doe",
+				"age":  22,
+				"hobbies": map[string]interface{}{
+					"indoor": []string{
+						"Chess",
+					},
+					"outdoor": []string{
+						"Basketball",
+					},
+				},
 			},
-			r: []byte{0x0a, 0x0e, 0x61, 0x6e, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x20, 0x74, 0x65, 0x73, 0x74, 0x20, 0x32},
+			r: []byte(`{"age":22,"hobbies":{"indoor":["Chess"],"outdoor":["Basketball"]},"id":7,"name":"John Doe"}`),
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
@@ -84,13 +95,13 @@ func testEncode(t *testing.T) {
 		if !reflect.DeepEqual(tt.e, testx.Errstring(err)) {
 			t.Errorf("%d.error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.e, err)
 		} else if tt.e == "" && !reflect.DeepEqual(tt.r, a) {
-			t.Errorf("%d. \n\nresult mismatch:\n\nexp=%x\n\ngot=%x\n\n", i, tt.r, a)
+			t.Errorf("%d. \n\nresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.r, a)
 		}
 	}
 }
 
 func testDecode(t *testing.T) {
-	c, err := LoadConverter("custom", "helloworld", "HelloRequest")
+	c, err := LoadConverter("custom", "myFormat", "Sample")
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -103,7 +114,7 @@ func testDecode(t *testing.T) {
 			m: map[string]interface{}{
 				"name": "test",
 			},
-			r: []byte{0x0a, 0x04, 0x74, 0x65, 0x73, 0x74},
+			r: []byte(`{"name":"test"}`),
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))

+ 83 - 0
internal/converter/custom/test/myformat.go

@@ -0,0 +1,83 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+)
+
+type Hobbies struct {
+	Indoor  []string `json:"indoor"`
+	Outdoor []string `json:"outdoor"`
+}
+
+type Sample struct {
+	Id      int64   `json:"id"`
+	Name    string  `json:"name"`
+	Age     int64   `json:"age"`
+	Hobbies Hobbies `json:"hobbies"`
+}
+
+func (x *Sample) GetSchemaJson() string {
+	// return a static schema
+	return `{
+		"id": {
+			"type": "bigint"
+	},
+		"name": {
+			"type": "string"
+	},
+		"age": {
+			"type": "bigint"
+	},
+		"hobbies": {
+			"type": "struct",
+			"properties": {
+			"indoor": {
+				"type": "array",
+					"items": {
+						"type": "string"
+				}
+			},
+			"outdoor": {
+				"type": "array",
+					"items": {
+						"type": "string"
+				}
+			}
+		}
+	}
+	}`
+}
+
+func (x *Sample) Encode(d interface{}) ([]byte, error) {
+	switch r := d.(type) {
+	case map[string]interface{}:
+		return json.Marshal(r)
+	default:
+		return nil, fmt.Errorf("unsupported type %v, must be a map", d)
+	}
+}
+
+func (x *Sample) Decode(b []byte) (interface{}, error) {
+	result := make(map[string]interface{})
+	err := json.Unmarshal(b, &result)
+	return result, err
+}
+
+func GetSample() interface{} {
+	return &Sample{}
+}

+ 127 - 0
internal/schema/ext_inferer_protobuf.go

@@ -0,0 +1,127 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build schema || !core
+
+package schema
+
+import (
+	"fmt"
+	dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
+	"github.com/jhump/protoreflect/desc"
+	"github.com/jhump/protoreflect/desc/protoparse"
+	"github.com/lf-edge/ekuiper/internal/pkg/def"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+	"github.com/lf-edge/ekuiper/pkg/message"
+)
+
+var protoParser *protoparse.Parser
+
+func init() {
+	inferes[message.FormatProtobuf] = InferProtobuf
+	protoParser = &protoparse.Parser{}
+}
+
+// InferProtobuf infers the schema from a protobuf file dynamically in case the schema file changed
+func InferProtobuf(schemaFile string, messageName string) (ast.StreamFields, error) {
+	ffs, err := GetSchemaFile(def.PROTOBUF, schemaFile)
+	if err != nil {
+		return nil, err
+	}
+	if fds, err := protoParser.ParseFiles(ffs.SchemaFile); err != nil {
+		return nil, fmt.Errorf("parse schema file %s failed: %s", ffs.SchemaFile, err)
+	} else {
+		messageDescriptor := fds[0].FindMessage(messageName)
+		if messageDescriptor == nil {
+			return nil, fmt.Errorf("message type %s not found in schema file %s", messageName, schemaFile)
+		}
+		return convertMessage(messageDescriptor)
+	}
+}
+
+func convertMessage(m *desc.MessageDescriptor) (ast.StreamFields, error) {
+	mfs := m.GetFields()
+	result := make(ast.StreamFields, 0, len(mfs))
+	for _, f := range mfs {
+		ff, err := convertField(f)
+		if err != nil {
+			return nil, err
+		}
+		result = append(result, ff)
+	}
+	return result, nil
+}
+
+func convertField(f *desc.FieldDescriptor) (ast.StreamField, error) {
+	ff := ast.StreamField{
+		Name: f.GetName(),
+	}
+	var (
+		ft  ast.FieldType
+		err error
+	)
+	ft, err = convertFieldType(f.GetType(), f)
+	if err != nil {
+		return ff, err
+	}
+	if f.IsRepeated() {
+		switch t := ft.(type) {
+		case *ast.BasicType:
+			ft = &ast.ArrayType{
+				Type: t.Type,
+			}
+		case *ast.RecType:
+			ft = &ast.ArrayType{
+				Type:      ast.STRUCT,
+				FieldType: t,
+			}
+		case *ast.ArrayType:
+			ft = &ast.ArrayType{
+				Type:      ast.ARRAY,
+				FieldType: t,
+			}
+		}
+	}
+	ff.FieldType = ft
+	return ff, nil
+}
+
+func convertFieldType(tt dpb.FieldDescriptorProto_Type, f *desc.FieldDescriptor) (ast.FieldType, error) {
+	var ft ast.FieldType
+	switch tt {
+	case dpb.FieldDescriptorProto_TYPE_DOUBLE,
+		dpb.FieldDescriptorProto_TYPE_FLOAT:
+		ft = &ast.BasicType{Type: ast.FLOAT}
+	case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32,
+		dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64,
+		dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32,
+		dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
+		ft = &ast.BasicType{Type: ast.BIGINT}
+	case dpb.FieldDescriptorProto_TYPE_BOOL:
+		ft = &ast.BasicType{Type: ast.BOOLEAN}
+	case dpb.FieldDescriptorProto_TYPE_STRING:
+		ft = &ast.BasicType{Type: ast.STRINGS}
+	case dpb.FieldDescriptorProto_TYPE_BYTES:
+		ft = &ast.BasicType{Type: ast.BYTEA}
+	case dpb.FieldDescriptorProto_TYPE_MESSAGE:
+		sfs, err := convertMessage(f.GetMessageType())
+		if err != nil {
+			return nil, fmt.Errorf("invalid struct field type: %v", err)
+		}
+		ft = &ast.RecType{StreamFields: sfs}
+	default:
+		return nil, fmt.Errorf("invalid type for field '%s'", f.GetName())
+	}
+	return ft, nil
+}

+ 81 - 0
internal/schema/ext_inferer_protobuf_test.go

@@ -0,0 +1,81 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build schema || !core
+
+package schema
+
+import (
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/testx"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+	"os"
+	"path/filepath"
+	"reflect"
+	"testing"
+)
+
+func TestInferProtobuf(t *testing.T) {
+	testx.InitEnv()
+	// Move test schema file to etc dir
+	etcDir, err := conf.GetDataLoc()
+	if err != nil {
+		t.Fatal(err)
+	}
+	etcDir = filepath.Join(etcDir, "schemas", "protobuf")
+	err = os.MkdirAll(etcDir, os.ModePerm)
+	if err != nil {
+		t.Fatal(err)
+	}
+	//Copy init.proto
+	bytesRead, err := os.ReadFile("test/test1.proto")
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = os.WriteFile(filepath.Join(etcDir, "test1.proto"), bytesRead, 0755)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() {
+		err = os.RemoveAll(etcDir)
+		if err != nil {
+			t.Fatal(err)
+		}
+	}()
+	err = InitRegistry()
+	if err != nil {
+		t.Errorf("InitRegistry error: %v", err)
+		return
+	}
+	// Test infer
+	result, err := InferProtobuf("test1", "Person")
+	if err != nil {
+		t.Errorf("InferProtobuf error: %v", err)
+		return
+	}
+	expected := ast.StreamFields{
+		{Name: "name", FieldType: &ast.BasicType{Type: ast.STRINGS}},
+		{Name: "id", FieldType: &ast.BasicType{Type: ast.BIGINT}},
+		{Name: "email", FieldType: &ast.BasicType{Type: ast.STRINGS}},
+		{Name: "code", FieldType: &ast.ArrayType{
+			Type: ast.STRUCT,
+			FieldType: &ast.RecType{StreamFields: []ast.StreamField{
+				{Name: "doubles", FieldType: &ast.ArrayType{Type: ast.FLOAT}},
+			}},
+		}},
+	}
+	if !reflect.DeepEqual(result, expected) {
+		t.Errorf("InferProtobuf result is not expected, got %v, expected %v", result, expected)
+	}
+}

+ 42 - 0
internal/schema/inferer.go

@@ -0,0 +1,42 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package schema
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+	"github.com/lf-edge/ekuiper/pkg/message"
+	"strings"
+)
+
+type inferer func(schemaFileName string, SchemaMessageName string) (ast.StreamFields, error)
+
+var ( // init once and read only
+	inferes = map[string]inferer{
+		message.FormatCustom: InferCustom,
+	}
+)
+
+func InferFromSchemaFile(schemaType string, schemaId string) (ast.StreamFields, error) {
+	r := strings.Split(schemaId, ".")
+	if len(r) != 2 {
+		return nil, fmt.Errorf("invalid schemaId: %s", schemaId)
+	}
+	if c, ok := inferes[schemaType]; ok {
+		return c(r[0], r[1])
+	} else {
+		return nil, fmt.Errorf("unsupported type: %s", schemaType)
+	}
+}

+ 63 - 0
internal/schema/inferer_custom.go

@@ -0,0 +1,63 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package schema
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/pkg/def"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+	"github.com/lf-edge/ekuiper/pkg/message"
+	"plugin"
+)
+
+func InferCustom(schemaFile string, messageName string) (ast.StreamFields, error) {
+	conf.Log.Infof("Load custom schema from file %s, for symbol Get%s", schemaFile, messageName)
+	ffs, err := GetSchemaFile(def.CUSTOM, schemaFile)
+	if err != nil {
+		return nil, err
+	}
+	if ffs.SoFile == "" {
+		return nil, fmt.Errorf("no so file found for custom schema %s", messageName)
+	}
+	sp, err := plugin.Open(ffs.SoFile)
+	if err != nil {
+		conf.Log.Errorf(fmt.Sprintf("custom schema file %s open error: %v", ffs.SoFile, err))
+		return nil, fmt.Errorf("cannot open %s: %v", ffs.SoFile, err)
+	}
+	nf, err := sp.Lookup("Get" + messageName)
+	if err != nil {
+		conf.Log.Warnf(fmt.Sprintf("cannot find schemaId %s, please check if it is exported: Get%v", messageName, err))
+		return nil, nil
+	}
+	nff, ok := nf.(func() interface{})
+	if !ok {
+		conf.Log.Errorf("exported symbol Get%s is not func to return interface{}", messageName)
+		return nil, fmt.Errorf("load custom schema %s, message %s error", ffs.SoFile, messageName)
+	}
+	mc, ok := nff().(message.SchemaProvider)
+	if ok {
+		sj := mc.GetSchemaJson()
+		var result ast.StreamFields
+		err := json.Unmarshal([]byte(sj), &result)
+		if err != nil {
+			return nil, fmt.Errorf("invalid schema json %s: %v", sj, err)
+		}
+		return result, nil
+	} else {
+		return nil, fmt.Errorf("get schema converter failed, exported symbol %s is not type of message.Converter", messageName)
+	}
+}

+ 74 - 0
internal/schema/inferer_custom_test.go

@@ -0,0 +1,74 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package schema
+
+import (
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+	"os"
+	"path/filepath"
+	"testing"
+)
+
+func TestInferCustom(t *testing.T) {
+	// Prepare test schema file
+	dataDir, err := conf.GetDataLoc()
+	if err != nil {
+		t.Fatal(err)
+	}
+	etcDir := filepath.Join(dataDir, "schemas", "custom")
+	err = os.MkdirAll(etcDir, os.ModePerm)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() {
+		err = os.RemoveAll(etcDir)
+		if err != nil {
+			t.Fatal(err)
+		}
+	}()
+	// build the so file into data/test prior to running the test
+	//Copy the helloworld.so
+	bytesRead, err := os.ReadFile(filepath.Join(dataDir, "myFormat.so"))
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = os.WriteFile(filepath.Join(etcDir, "myFormat.so"), bytesRead, 0755)
+	if err != nil {
+		t.Fatal(err)
+	}
+	InitRegistry()
+
+	// Test
+	result, err := InferCustom("myFormat", "Sample")
+	if err != nil {
+		t.Errorf("Infer custom format error: %v", err)
+		return
+	}
+	expected := ast.StreamFields{
+		{Name: "id", FieldType: &ast.BasicType{Type: ast.BIGINT}},
+		{Name: "name", FieldType: &ast.BasicType{Type: ast.STRINGS}},
+		{Name: "age", FieldType: &ast.BasicType{Type: ast.BIGINT}},
+		{Name: "hobbies", FieldType: &ast.RecType{
+			StreamFields: []ast.StreamField{
+				{Name: "indoor", FieldType: &ast.ArrayType{Type: ast.STRINGS}},
+				{Name: "outdoor", FieldType: &ast.ArrayType{Type: ast.STRINGS}},
+			},
+		}},
+	}
+	if len(result) != len(expected) {
+		t.Errorf("InferProtobuf result is not expected, got %v, expected %v", result, expected)
+	}
+}

+ 94 - 0
pkg/ast/sourceStmt.go

@@ -50,6 +50,12 @@ type StreamField struct {
 	FieldType
 }
 
+type JsonStreamField struct {
+	Type       string                      `json:"type"`
+	Items      *JsonStreamField            `json:"items,omitempty"`
+	Properties map[string]*JsonStreamField `json:"properties,omitempty"`
+}
+
 func (u *StreamField) MarshalJSON() ([]byte, error) {
 	return json.Marshal(&struct {
 		FieldType interface{}
@@ -60,6 +66,94 @@ func (u *StreamField) MarshalJSON() ([]byte, error) {
 	})
 }
 
+// UnmarshalJSON The json format follows json schema
+func (sf *StreamFields) UnmarshalJSON(data []byte) error {
+	temp := map[string]*JsonStreamField{}
+	err := json.Unmarshal(data, &temp)
+	if err != nil {
+		return err
+	}
+	return sf.UnmarshalFromMap(temp)
+}
+
+func (sf *StreamFields) UnmarshalFromMap(data map[string]*JsonStreamField) error {
+	t, err := fieldsTypeFromSchema(data)
+	if err != nil {
+		return err
+	}
+	*sf = t
+	return nil
+}
+
+func fieldsTypeFromSchema(mjsf map[string]*JsonStreamField) (StreamFields, error) {
+	sfs := make(StreamFields, 0, len(mjsf))
+	for k, v := range mjsf {
+		ft, err := fieldTypeFromSchema(v)
+		if err != nil {
+			return nil, err
+		}
+		sfs = append(sfs, StreamField{
+			Name:      k,
+			FieldType: ft,
+		})
+	}
+	return sfs, nil
+}
+
+func fieldTypeFromSchema(v *JsonStreamField) (FieldType, error) {
+	var ft FieldType
+	switch v.Type {
+	case "array":
+		if v.Items == nil {
+			return nil, fmt.Errorf("array field type should have items")
+		}
+		itemType, err := fieldTypeFromSchema(v.Items)
+		if err != nil {
+			return nil, fmt.Errorf("invalid array field type: %v", err)
+		}
+		switch t := itemType.(type) {
+		case *BasicType:
+			ft = &ArrayType{
+				Type: t.Type,
+			}
+		case *RecType:
+			ft = &ArrayType{
+				Type:      STRUCT,
+				FieldType: t,
+			}
+		case *ArrayType:
+			ft = &ArrayType{
+				Type:      ARRAY,
+				FieldType: t,
+			}
+		}
+	case "struct":
+		if v.Properties == nil {
+			return nil, fmt.Errorf("struct field type should have properties")
+		}
+		sfs, err := fieldsTypeFromSchema(v.Properties)
+		if err != nil {
+			return nil, fmt.Errorf("invalid struct field type: %v", err)
+		}
+		ft = &RecType{StreamFields: sfs}
+	case "bigint":
+		ft = &BasicType{Type: BIGINT}
+	case "float":
+		ft = &BasicType{Type: FLOAT}
+	case "string":
+		ft = &BasicType{Type: STRINGS}
+	case "bytea":
+		ft = &BasicType{Type: BYTEA}
+	case "datetime":
+		ft = &BasicType{Type: DATETIME}
+	case "boolean":
+		ft = &BasicType{Type: BOOLEAN}
+	default:
+		return nil, fmt.Errorf("unsupported type %s", v.Type)
+	}
+	return ft, nil
+}
+
 type StreamFields []StreamField
 
 type FieldType interface {

+ 4 - 0
pkg/message/artifacts.go

@@ -38,3 +38,7 @@ type Converter interface {
 	Encode(d interface{}) ([]byte, error)
 	Decode(b []byte) (interface{}, error)
 }
+
+type SchemaProvider interface {
+	GetSchemaJson() string
+}