Selaa lähdekoodia

refactor(service): remove msgpack from standard package

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 1 vuosi sitten
vanhempi
commit
e0905cac02

+ 1 - 1
.github/workflows/run_test_case.yaml

@@ -62,7 +62,7 @@ jobs:
         cd ../../../../
         cp -r sdk/python/example/pysam plugins/portable/pysam
         cp -r sdk/python/ekuiper plugins/portable/pysam/
-        go test --tags="edgex script test" --cover -covermode=atomic -coverpkg=./... -coverprofile=coverage.out $(go list ./... | grep -v "github.com/lf-edge/ekuiper/internal/topo/topotest/plugin")
+        go test --tags="edgex msgpack script test" --cover -covermode=atomic -coverpkg=./... -coverprofile=coverage.out $(go list ./... | grep -v "github.com/lf-edge/ekuiper/internal/topo/topotest/plugin")
     - name: Upload coverage to Codecov
       uses: codecov/codecov-action@v3
       with:

+ 3 - 3
docs/en_US/extension/external/external_func.md

@@ -15,7 +15,7 @@ The json configuration file includes the following two parts:
 
 - about: Used to describe the Meta-information of service, including author, detailed description, help document url, etc. For detailed usage, please refer to the example below.
 - interfaces: Used to define a set of service interfaces. Services provided by the same server often have the same service address and can be used as a service interface. Each service interface contains the following attributes:
-    - protocol: The protocol used by the service. "grpc", "rest" and "msgpack-rpc" are supported currently.
+    - protocol: The protocol used by the service. "grpc", "rest" are supported currently. The "msgpack-rpc" is not built by default, you need to build it with build tag "msgpack" by yourself. Please refer to [feature compilation](../../operation/compile/features.md#usage) for detail.
     - address: Service address, which must be url. For example, typical rpc service address: "tcp://localhost:50000" or http service address "https://localhost:8000".
     - schemaType: The type of service description file. Only "protobuf" is supported currently .
     - schemaFile: service description file, currently only proto file is supported. The rest and msgpack services also need to be described in proto.
@@ -93,7 +93,7 @@ This file defines the sample service, which contains the call information of 3 s
 
 - trueno: grpc service
 - tsrest: rest service
-- tsrpc: msgpack-rpc service
+- tsrpc: msgpack-rpc service (not built by default)
 
 The service provided by each service interface is defined by its corresponding schema file. Taking tsrest as an example, its schema file is tsrest.proto, which is defined as follows:
 
@@ -205,7 +205,7 @@ Notice that, in REST call the parameters will be parsed to json.  Proto message
 
 ### Notification
 
-Since REST and msgpack-rpc are not natively defined by protobuf, there are some  limitations when using them.
+Since REST and msgpack-rpc are not natively defined by protobuf, there are some limitations when using them.
 
 The REST service is **POST** by default currently, and the transmission format is json. The user can change the default method through [http options](#http-options) in the defined protobuf. There are some restricitons in rest service:
 

+ 1 - 0
docs/en_US/operation/compile/features.md

@@ -12,6 +12,7 @@ Except core runtime and REST api, there are some features that are allowed to be
 | [Native plugin](../../extension/native/overview.md)                                               | plugin     | The native plugin runtime, REST API, CLI API etc.                                                                                                      |
 | [Portable plugin](../../extension/portable/overview.md)                                           | portable   | The portable plugin runtime, REST API, CLI API etc.                                                                                                    |
 | [External service](../../extension/external/external_func.md)                                     | service    | The external service runtime, REST API, CLI API etc.                                                                                                   |
+| [Msgpack-rpc External service](../../extension/external/external_func.md)                         | msgpack    | Support msgpack-rpc protocol in external service                                                                                                       |
 | [UI Meta API](../../operation/manager-ui/overview.md)                                             | ui         | The REST API of the metadata which is usually consumed by the ui                                                                                       |
 | [Prometheus Metrics](../../configuration/global_configurations.md#prometheus-configuration)       | prometheus | Support to send metrics to prometheus                                                                                                                  |
 | [Extended template functions](../../guide/sinks/data_template.md#functions-supported-in-template) | template   | Support additional data template function from sprig besides default go text/template functions                                                        |

+ 1 - 1
docs/zh_CN/extension/external/external_func.md

@@ -14,7 +14,7 @@ json 配置文件包括以下两个部分:
 
 - about: 用于描述服务的元信息,包括作者,详细描述,帮助文档 url 等。详细用法请参考下面的范例。
 - interfaces: 用于定义一组服务接口。同一个服务器提供的服务往往具有相同的服务地址,可作为一个服务接口。每一个服务接口包含下列属性:
-    - protocol: 服务采用的协议。目前支持 "grpc", "rest" 和 "msgpack-rpc"
+    - protocol: 服务采用的协议。目前支持 "grpc", "rest"。需要注意的是, "msgpack-rpc" 服务没有编译到默认的 eKuiper 中,需要添加 build tag "msgpack" 并自行编译。详情请参考[按需编译](../../operation/compile/features.md#使用)。
     - adddress: 服务地址,必须为 url。例如,典型 rpc 服务地址:"tcp://localhost:50000" 或者 http 服务地址 "https://localhost:8000"。
     - schemaType: 服务描述文件类型。目前仅支持 "protobuf"。
     - schemaFile: 服务描述文件,目前仅支持 proto 文件。rest 和 msgpack 服务也需要采用 proto 描述。

+ 1 - 1
docs/zh_CN/operation/compile/features.md

@@ -17,7 +17,7 @@
 | [扩展模板函数](../../guide/sinks/data_template.md#模版中支持的函数)                       | template   | 支持除 go 语言默认的模板函数之外的扩展函数,主要来自 sprig                           |
 | [有模式编解码](../../guide/serialization/serialization.md)                        | schema     | 支持模式注册及有模式的编解码格式,例如 protobuf                                 |
 
-## Usage
+## 使用
 
 Makefile 里已经提供了三种功能集合:标准,edgeX和核心。标准功能集合包含除了 EdgeX 之外的所有功能。edgeX 功能集合包含了所有的功能;而核心功能集合近包含最小的核心功能。可以通过以下命令,分别编译这三种功能集合:
 

+ 43 - 99
internal/service/executors.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -19,20 +19,15 @@ import (
 	"crypto/tls"
 	"fmt"
 	"io"
-	"net"
 	"net/http"
-	"net/rpc"
 	"net/url"
-	"reflect"
 	"strings"
-	"sync"
 	"time"
 
 	// TODO: replace with `google.golang.org/protobuf/proto` pkg.
 	"github.com/golang/protobuf/proto" //nolint:staticcheck
 	"github.com/jhump/protoreflect/dynamic"
 	"github.com/jhump/protoreflect/dynamic/grpcdynamic"
-	"github.com/ugorji/go/codec"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/credentials/insecure"
 
@@ -42,8 +37,45 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/infra"
 )
 
+type exeIns func(desc descriptor, opt *interfaceOpt, i *interfaceInfo) (executor, error)
+
+var executors = map[protocol]exeIns{
+	GRPC: newGrpcExecutor,
+	REST: newHttpExecutor,
+}
+
+func newHttpExecutor(desc descriptor, opt *interfaceOpt, i *interfaceInfo) (executor, error) {
+	d, ok := desc.(multiplexDescriptor)
+	if !ok {
+		return nil, fmt.Errorf("invalid descriptor type for rest")
+	}
+	o := &restOption{}
+	e := cast.MapToStruct(i.Options, o)
+	if e != nil {
+		return nil, fmt.Errorf("incorrect rest option: %v", e)
+	}
+	exe := &httpExecutor{
+		descriptor:   d,
+		interfaceOpt: opt,
+		restOpt:      o,
+	}
+	return exe, nil
+}
+
+func newGrpcExecutor(desc descriptor, opt *interfaceOpt, _ *interfaceInfo) (executor, error) {
+	d, ok := desc.(protoDescriptor)
+	if !ok {
+		return nil, fmt.Errorf("invalid descriptor type for grpc")
+	}
+	exe := &grpcExecutor{
+		descriptor:   d,
+		interfaceOpt: opt,
+	}
+	return exe, nil
+}
+
 // NewExecutor
-// Each interface definition maps to one executor instance. It is suppose to have only one thread running.
+// Each interface definition maps to one executor instance. It is supposed to have only one thread running.
 func NewExecutor(i *interfaceInfo) (executor, error) {
 	// No validation here, suppose the validation has been done in json parsing
 	descriptor, err := parse(i.Schema.SchemaType, i.Schema.SchemaFile)
@@ -58,44 +90,10 @@ func NewExecutor(i *interfaceInfo) (executor, error) {
 		addr:    u,
 		timeout: 5000,
 	}
-	switch i.Protocol {
-	case GRPC:
-		d, ok := descriptor.(protoDescriptor)
-		if !ok {
-			return nil, fmt.Errorf("invalid descriptor type for grpc")
-		}
-		exe := &grpcExecutor{
-			descriptor:   d,
-			interfaceOpt: opt,
-		}
-		return exe, nil
-	case REST:
-		d, ok := descriptor.(multiplexDescriptor)
-		if !ok {
-			return nil, fmt.Errorf("invalid descriptor type for rest")
-		}
-		o := &restOption{}
-		e := cast.MapToStruct(i.Options, o)
-		if e != nil {
-			return nil, fmt.Errorf("incorrect rest option: %v", e)
-		}
-		exe := &httpExecutor{
-			descriptor:   d,
-			interfaceOpt: opt,
-			restOpt:      o,
-		}
-		return exe, nil
-	case MSGPACK:
-		d, ok := descriptor.(interfaceDescriptor)
-		if !ok {
-			return nil, fmt.Errorf("invalid descriptor type for msgpack-rpc")
-		}
-		exe := &msgpackExecutor{
-			descriptor:   d,
-			interfaceOpt: opt,
-		}
-		return exe, nil
-	default:
+
+	if ins, ok := executors[i.Protocol]; ok {
+		return ins(descriptor, opt, i)
+	} else {
 		return nil, fmt.Errorf("unsupported protocol %s", i.Protocol)
 	}
 }
@@ -238,57 +236,3 @@ func (h *httpExecutor) InvokeFunction(ctx api.FunctionContext, name string, para
 		}
 	}
 }
-
-type msgpackExecutor struct {
-	descriptor interfaceDescriptor
-	*interfaceOpt
-
-	sync.Mutex
-	connected bool
-	conn      *rpc.Client
-}
-
-// InvokeFunction flat the params and result
-func (m *msgpackExecutor) InvokeFunction(_ api.FunctionContext, name string, params []interface{}) (interface{}, error) {
-	if !m.connected {
-		m.Lock()
-		if !m.connected {
-			h := &codec.MsgpackHandle{}
-			h.MapType = reflect.TypeOf(map[string]interface{}(nil))
-
-			conn, err := net.Dial(m.addr.Scheme, m.addr.Host)
-			if err != nil {
-				return nil, err
-			}
-			rpcCodec := codec.MsgpackSpecRpc.ClientCodec(conn, h)
-			m.conn = rpc.NewClientWithCodec(rpcCodec)
-		}
-		m.connected = true
-		m.Unlock()
-	}
-	ps, err := m.descriptor.ConvertParams(name, params)
-	if err != nil {
-		return nil, err
-	}
-	var (
-		reply interface{}
-		args  interface{}
-	)
-	// TODO argument flat
-	switch len(ps) {
-	case 0:
-		// do nothing
-	case 1:
-		args = ps[0]
-	default:
-		args = codec.MsgpackSpecRpcMultiArgs(ps)
-	}
-	err = m.conn.Call(name, args, &reply)
-	if err != nil {
-		if err == rpc.ErrShutdown {
-			m.connected = false
-		}
-		return nil, err
-	}
-	return m.descriptor.ConvertReturn(name, reply)
-}

+ 96 - 0
internal/service/executors_msgpack.go

@@ -0,0 +1,96 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build msgpack
+
+package service
+
+import (
+	"fmt"
+	"github.com/ugorji/go/codec"
+	"net"
+	"net/rpc"
+	"reflect"
+	"sync"
+
+	"github.com/lf-edge/ekuiper/pkg/api"
+)
+
+func init() {
+	executors[MSGPACK] = func(desc descriptor, opt *interfaceOpt, _ *interfaceInfo) (executor, error) {
+		d, ok := desc.(interfaceDescriptor)
+		if !ok {
+			return nil, fmt.Errorf("invalid descriptor type for msgpack-rpc")
+		}
+		exe := &msgpackExecutor{
+			descriptor:   d,
+			interfaceOpt: opt,
+		}
+		return exe, nil
+	}
+}
+
+type msgpackExecutor struct {
+	descriptor interfaceDescriptor
+	*interfaceOpt
+
+	sync.Mutex
+	connected bool
+	conn      *rpc.Client
+}
+
+// InvokeFunction flat the params and result
+func (m *msgpackExecutor) InvokeFunction(_ api.FunctionContext, name string, params []interface{}) (interface{}, error) {
+	if !m.connected {
+		m.Lock()
+		if !m.connected {
+			h := &codec.MsgpackHandle{}
+			h.MapType = reflect.TypeOf(map[string]interface{}(nil))
+
+			conn, err := net.Dial(m.addr.Scheme, m.addr.Host)
+			if err != nil {
+				return nil, err
+			}
+			rpcCodec := codec.MsgpackSpecRpc.ClientCodec(conn, h)
+			m.conn = rpc.NewClientWithCodec(rpcCodec)
+		}
+		m.connected = true
+		m.Unlock()
+	}
+	ps, err := m.descriptor.ConvertParams(name, params)
+	if err != nil {
+		return nil, err
+	}
+	var (
+		reply interface{}
+		args  interface{}
+	)
+	// TODO argument flat
+	switch len(ps) {
+	case 0:
+		// do nothing
+	case 1:
+		args = ps[0]
+	default:
+		args = codec.MsgpackSpecRpcMultiArgs(ps)
+	}
+	err = m.conn.Call(name, args, &reply)
+	if err != nil {
+		if err == rpc.ErrShutdown {
+			m.connected = false
+		}
+		return nil, err
+	}
+	return m.descriptor.ConvertReturn(name, reply)
+}

+ 156 - 0
internal/service/executors_msgpack_test.go

@@ -0,0 +1,156 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build msgpack
+
+package service
+
+import (
+	"github.com/msgpack-rpc/msgpack-rpc-go/rpc"
+	"net"
+	"reflect"
+	"testing"
+
+	"github.com/lf-edge/ekuiper/internal/topo/topotest"
+	"github.com/lf-edge/ekuiper/pkg/api"
+)
+
+func TestMsgpackService(t *testing.T) {
+	// mock server
+	res := Resolver{"SayHello": reflect.ValueOf(SayHello), "object_detection": reflect.ValueOf(object_detection), "get_feature": reflect.ValueOf(get_feature), "getStatus": reflect.ValueOf(getStatus)}
+	serv := rpc.NewServer(res, true, nil)
+	l, _ := net.Listen("tcp", ":50000")
+	serv.Listen(l)
+	go serv.Run()
+	// Comment out because the bug in the msgpack rpc
+	// defer serv.Stop()
+
+	// Reset
+	streamList := []string{"helloStr", "commands", "fakeBin"}
+	topotest.HandleStream(false, streamList, t)
+	// Data setup
+	tests := []topotest.RuleTest{
+		{
+			Name: `TestRestRule1`,
+			Sql:  `SELECT helloFromMsgpack(name) as wc FROM helloStr`,
+			R: [][]map[string]interface{}{
+				{{
+					"wc": map[string]interface{}{
+						"message": "world",
+					},
+				}},
+				{{
+					"wc": map[string]interface{}{
+						"message": "golang",
+					},
+				}},
+				{{
+					"wc": map[string]interface{}{
+						"message": "peacock",
+					},
+				}},
+			},
+			M: map[string]interface{}{
+				"op_2_project_0_exceptions_total":   int64(0),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(3),
+				"op_2_project_0_records_out_total":  int64(3),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(3),
+				"sink_mockSink_0_records_out_total": int64(3),
+			},
+		}, {
+			Name: `TestRestRule2`,
+			Sql:  `SELECT objectDetectFromMsgpack(*)->result FROM commands`,
+			R: [][]map[string]interface{}{
+				{{
+					"kuiper_field_0": "get success",
+				}},
+				{{
+					"kuiper_field_0": "detect success",
+				}},
+				{{
+					"kuiper_field_0": "delete success",
+				}},
+			},
+			M: map[string]interface{}{
+				"op_2_project_0_exceptions_total":   int64(0),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(3),
+				"op_2_project_0_records_out_total":  int64(3),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(3),
+				"sink_mockSink_0_records_out_total": int64(3),
+			},
+		}, {
+			Name: `TestRestRule3`,
+			Sql:  `SELECT getFeatureFromMsgpack(self)->feature[0]->box->h FROM fakeBin`,
+			R: [][]map[string]interface{}{
+				{{
+					"kuiper_field_0": float64(106), // Convert by the testing tool
+				}},
+				{{
+					"kuiper_field_0": float64(107),
+				}},
+				{{
+					"kuiper_field_0": float64(108),
+				}},
+			},
+			M: map[string]interface{}{
+				"op_2_project_0_exceptions_total":   int64(0),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(3),
+				"op_2_project_0_records_out_total":  int64(3),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(3),
+				"sink_mockSink_0_records_out_total": int64(3),
+			},
+			//}, {
+			//	Name: `TestRestRule4`,
+			//	Sql:  `SELECT getStatusFromMsgpack(), command FROM commands`,
+			//	R: [][]map[string]interface{}{
+			//		{{
+			//			"getStatusFromRest": true,
+			//			"command": "get",
+			//		}},
+			//		{{
+			//			"getStatusFromRest": true,
+			//			"command": "detect",
+			//		}},
+			//		{{
+			//			"getStatusFromRest": true,
+			//			"command": "delete",
+			//		}},
+			//	},
+			//	M: map[string]interface{}{
+			//		"op_2_project_0_exceptions_total":   int64(0),
+			//		"op_2_project_0_process_latency_us": int64(0),
+			//		"op_2_project_0_records_in_total":   int64(3),
+			//		"op_2_project_0_records_out_total":  int64(3),
+			//
+			//		"sink_mockSink_0_exceptions_total":  int64(0),
+			//		"sink_mockSink_0_records_in_total":  int64(3),
+			//		"sink_mockSink_0_records_out_total": int64(3),
+			//	},
+		},
+	}
+	topotest.HandleStream(true, streamList, t)
+	topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
+		BufferLength: 100,
+		SendError:    true,
+	}, 0)
+}

+ 0 - 130
internal/service/external_service_rule_test.go

@@ -29,7 +29,6 @@ import (
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/golang/protobuf/ptypes/wrappers"
 	"github.com/gorilla/mux"
-	"github.com/msgpack-rpc/msgpack-rpc-go/rpc"
 	"google.golang.org/grpc"
 
 	kconf "github.com/lf-edge/ekuiper/internal/conf"
@@ -581,135 +580,6 @@ func getStatus() bool {
 	return true
 }
 
-func TestMsgpackService(t *testing.T) {
-	// mock server
-	res := Resolver{"SayHello": reflect.ValueOf(SayHello), "object_detection": reflect.ValueOf(object_detection), "get_feature": reflect.ValueOf(get_feature), "getStatus": reflect.ValueOf(getStatus)}
-	serv := rpc.NewServer(res, true, nil)
-	l, _ := net.Listen("tcp", ":50000")
-	serv.Listen(l)
-	go serv.Run()
-	// Comment out because the bug in the msgpack rpc
-	// defer serv.Stop()
-
-	// Reset
-	streamList := []string{"helloStr", "commands", "fakeBin"}
-	topotest.HandleStream(false, streamList, t)
-	// Data setup
-	tests := []topotest.RuleTest{
-		{
-			Name: `TestRestRule1`,
-			Sql:  `SELECT helloFromMsgpack(name) as wc FROM helloStr`,
-			R: [][]map[string]interface{}{
-				{{
-					"wc": map[string]interface{}{
-						"message": "world",
-					},
-				}},
-				{{
-					"wc": map[string]interface{}{
-						"message": "golang",
-					},
-				}},
-				{{
-					"wc": map[string]interface{}{
-						"message": "peacock",
-					},
-				}},
-			},
-			M: map[string]interface{}{
-				"op_2_project_0_exceptions_total":   int64(0),
-				"op_2_project_0_process_latency_us": int64(0),
-				"op_2_project_0_records_in_total":   int64(3),
-				"op_2_project_0_records_out_total":  int64(3),
-
-				"sink_mockSink_0_exceptions_total":  int64(0),
-				"sink_mockSink_0_records_in_total":  int64(3),
-				"sink_mockSink_0_records_out_total": int64(3),
-			},
-		}, {
-			Name: `TestRestRule2`,
-			Sql:  `SELECT objectDetectFromMsgpack(*)->result FROM commands`,
-			R: [][]map[string]interface{}{
-				{{
-					"kuiper_field_0": "get success",
-				}},
-				{{
-					"kuiper_field_0": "detect success",
-				}},
-				{{
-					"kuiper_field_0": "delete success",
-				}},
-			},
-			M: map[string]interface{}{
-				"op_2_project_0_exceptions_total":   int64(0),
-				"op_2_project_0_process_latency_us": int64(0),
-				"op_2_project_0_records_in_total":   int64(3),
-				"op_2_project_0_records_out_total":  int64(3),
-
-				"sink_mockSink_0_exceptions_total":  int64(0),
-				"sink_mockSink_0_records_in_total":  int64(3),
-				"sink_mockSink_0_records_out_total": int64(3),
-			},
-		}, {
-			Name: `TestRestRule3`,
-			Sql:  `SELECT getFeatureFromMsgpack(self)->feature[0]->box->h FROM fakeBin`,
-			R: [][]map[string]interface{}{
-				{{
-					"kuiper_field_0": float64(106), // Convert by the testing tool
-				}},
-				{{
-					"kuiper_field_0": float64(107),
-				}},
-				{{
-					"kuiper_field_0": float64(108),
-				}},
-			},
-			M: map[string]interface{}{
-				"op_2_project_0_exceptions_total":   int64(0),
-				"op_2_project_0_process_latency_us": int64(0),
-				"op_2_project_0_records_in_total":   int64(3),
-				"op_2_project_0_records_out_total":  int64(3),
-
-				"sink_mockSink_0_exceptions_total":  int64(0),
-				"sink_mockSink_0_records_in_total":  int64(3),
-				"sink_mockSink_0_records_out_total": int64(3),
-			},
-			//}, {
-			//	Name: `TestRestRule4`,
-			//	Sql:  `SELECT getStatusFromMsgpack(), command FROM commands`,
-			//	R: [][]map[string]interface{}{
-			//		{{
-			//			"getStatusFromRest": true,
-			//			"command": "get",
-			//		}},
-			//		{{
-			//			"getStatusFromRest": true,
-			//			"command": "detect",
-			//		}},
-			//		{{
-			//			"getStatusFromRest": true,
-			//			"command": "delete",
-			//		}},
-			//	},
-			//	M: map[string]interface{}{
-			//		"op_2_project_0_exceptions_total":   int64(0),
-			//		"op_2_project_0_process_latency_us": int64(0),
-			//		"op_2_project_0_records_in_total":   int64(3),
-			//		"op_2_project_0_records_out_total":  int64(3),
-			//
-			//		"sink_mockSink_0_exceptions_total":  int64(0),
-			//		"sink_mockSink_0_records_in_total":  int64(3),
-			//		"sink_mockSink_0_records_out_total": int64(3),
-			//	},
-		},
-	}
-	topotest.HandleStream(true, streamList, t)
-	topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
-		BufferLength: 100,
-		SendError:    true,
-	}, 0)
-}
-
 type server struct {
 	UnimplementedGreeterServer
 }