Explorar o código

feat(service): support google http option

ngjaying %!s(int64=3) %!d(string=hai) anos
pai
achega
90799ce119

+ 71 - 4
docs/en_US/extension/external_func.md

@@ -121,6 +121,71 @@ This file defines the tsrest service interface to provide a service object_detec
 
 Protobuf uses proto3 format. Please refer to [proto3-spec](https://developers.google.com/protocol-buffers/docs/reference/proto3-spec) for detailed format.
 
+### HTTP Options
+
+In order to support detail configuration of the REST service, such as the http method, the url template, the params and the body, an additional mapping annotations based on grpc transcoding specification provided by *google.api.http* annotation. Users can specify a http rule for each rpc method to define the mapping of the rpc method to the http method, URL path, URL query parameters, and HTTP request body.
+
+Below is a portion of the revised tsrest.proto file in which a http rule is added. The rule specifies the http method to be *post*, and the mapping url to */v1/computation/object_detection* to override the default url */object_detection*. It also specifies the body to be a wildcard which means the whole input parameter of *ObjectDetectionRequest* will be the body.
+
+```protobuf
+service TSRest {
+  rpc object_detection(ObjectDetectionRequest) returns(ObjectDetectionResponse) {
+    option (google.api.http) = {
+      post: "/v1/computation/object_detection"
+      body: "*"
+    };
+  }
+}
+```
+
+If the object_detection rest service provides different url for different command, users can specify the url mapping with parameters as below. By this way, the input *ObjectDetectionRequest* parameter's *cmd* field is assigned to the url, and the *base64_img* field is processed as the body.
+
+```protobuf
+service TSRest {
+  rpc object_detection(ObjectDetectionRequest) returns(ObjectDetectionResponse) {
+    option (google.api.http) = {
+      post: "/v1/computation/object_detection/{cmd}"
+      body: "base64_img"
+    };
+  }
+}
+```
+
+Another typical scenario is the REST services to search a list. The search parameters are usually appended to the url as the query parameters. 
+
+```protobuf
+service TSRest {
+  rpc SearchMessage(MessageRequest) returns(Message) {
+    option (google.api.http) = {
+      get: "/v1/messages"
+    };
+  }
+}
+
+message MessageRequest {
+  string author = 1;
+  string title = 2;
+}
+```
+
+In this example, there is no *body* specified thus all parameter fields are mapped to the query parameter. When calling `SearchMessage({"author":"Author","title":"Message1"})` in Kuiper SQL, it will be mapped to `GET /v1/messages?author=Author&title=Message1`.
+
+For more detail about the mapping syntax for protobuf, please check [adding transcoding mapping](https://cloud.google.com/endpoints/docs/grpc/transcoding#adding_transcoding_mappings) and [httprule](https://cloud.google.com/endpoints/docs/grpc-service-config/reference/rpc/google.api#httprule).
+
+#### Usage
+
+To use the http options, the google api package must be imported in the proto file.
+
+```protobuf
+syntax = "proto3";
+
+package yourpackage;
+
+import "google/api/annotations.proto";
+```
+
+Thus, the google api proto files must be in the imported path. Kuiper already ship those proto files in `etc/services/schemas/google`. Users do not need to add this to the packaged customized service.
+
 ### Mapping
 
 In the external service configuration, there are 1 json file and at least 1 schema file(.proto) to define the function mapping. This will define a 3 layer mappings.
@@ -136,13 +201,14 @@ In this sample, if a user call `objectDetection` function in Kuiper SQL, the map
 
 Notice that, in REST call the parameters will be parsed to json.  Proto message field names are **converted** to lowerCamelCase and become JSON object keys. If the object keys of the REST API is not lowerCamelCase, the user must specify the json_name field option to avoid the conversion.
 
-### Limitation
+### Notification
 
 Since REST and msgpack-rpc are not natively defined by protobuf, there are some  limitations when using them.
 
-The REST service is **POST** by default currently, and the transmission format is json. In the defined protobuf:
+The REST service is **POST** by default currently, and the transmission format is json. The user can change the default method through [http options](#http-options) in the defined protobuf. There are some restricitons in rest service:
 
-- The input type must be **Message** or *google.protobuf.StringValue*. If the type is *google.protobuf.StringValue*, the parameter must be an encoded json string like `"{\"name\":\"name1\",\"size\":1}"`.
+- If http options are not specified, the input type must be **Message** or *google.protobuf.StringValue*. If the type is *google.protobuf.StringValue*, the parameter must be an encoded json string like `"{\"name\":\"name1\",\"size\":1}"`.
+- The marshalled json for int64 type will be string
 
 The msgpack-rpc service has the following limitation:
 - Input can not be empty
@@ -201,4 +267,5 @@ message ObjectDetectionRequest {
 }
 ```
 
-In Kuiper, users can pass in the entire struct as a parameter, or pass in two string parameters as cmd and base64_img respectively.
+In Kuiper, users can pass in the entire struct as a parameter, or pass in two string parameters as cmd and base64_img respectively.
+

+ 70 - 3
docs/zh_CN/extension/external_func.md

@@ -119,6 +119,72 @@ message ObjectDetectionResponse {
 
 Protobuf 采用 proto3 格式,详细格式请参考 [proto3-spec](https://developers.google.com/protocol-buffers/docs/reference/proto3-spec) 。
 
+### Http选项
+
+为了支持更细粒度的 REST 服务配置,例如配置 http 方法,URL,参数以及请求体,我们支持了基于 *google.api.http* 注解的 grpc 转码配置。在 proto 文件中,用户可通过给每个 rpc 方法添加注解的方式,配置该方法映射的 http 方法,URL 路径,URL 参数以及请求体。
+
+以下例子是修改过的 tsrest.proto 文件的一部分,添加了 http 规则的注解。例子中,注解指定了 http 方法为 *post*,映射的 url 为 */v1/computation/object_detection* 以覆盖默认的 url */object_detection*。注解中的 body 设置为 * 表示方法的类型为 *ObjectDetectionRequest* 的输入参数将完全转换为请求消息体。
+
+```protobuf
+service TSRest {
+  rpc object_detection(ObjectDetectionRequest) returns(ObjectDetectionResponse) {
+    option (google.api.http) = {
+      post: "/v1/computation/object_detection"
+      body: "*"
+    };
+  }
+}
+```
+
+假设映射的 object_detection 服务针对不同的命令提供不同的 URL,则用户可以通过指定 URL 参数的方式完成映射。在下面的示例中,输入参数 *ObjectDetectionRequest* 被分为了两个部分:*cmd* 作为了映射 URL 的一部分,而 *base64_img* 则用作请求消息体。
+
+```protobuf
+service TSRest {
+  rpc object_detection(ObjectDetectionRequest) returns(ObjectDetectionResponse) {
+    option (google.api.http) = {
+      post: "/v1/computation/object_detection/{cmd}"
+      body: "base64_img"
+    };
+  }
+}
+```
+
+另外一种常见的使用场景是搜索服务,其中搜索参数作为 URL 的一部分。
+
+```protobuf
+service TSRest {
+  rpc SearchMessage(MessageRequest) returns(Message) {
+    option (google.api.http) = {
+      get: "/v1/messages"
+    };
+  }
+}
+
+message MessageRequest {
+  string author = 1;
+  string title = 2;
+}
+```
+
+在这个例子中,*body* 没有指定,因此所有输入参数都映射成 URL 参数。在 SQL 中调用函数
+`SearchMessage({"author":"Author","title":"Message1"})` 将会映射成 `GET /v1/messages?author=Author&title=Message1`。
+
+更详细 protobuf http 映射的语法介绍,请参看 [转码映射](https://cloud.google.com/endpoints/docs/grpc/transcoding#adding_transcoding_mappings) 和 [httprule](https://cloud.google.com/endpoints/docs/grpc-service-config/reference/rpc/google.api#httprule).
+
+#### 用法
+
+使用 http 选项,必须在 proto 文件中导入如下文件:
+
+```protobuf
+syntax = "proto3";
+
+package yourpackage;
+
+import "google/api/annotations.proto";
+```
+
+因此,google api proto 文件必须在导入路径上。Kuiper 默认  `etc/services/schemas/google` 搭载了依赖的 proto 文件。用户无需在自定义服务里打包此依赖。
+
 ### 映射
 
 外部服务配置需要1个 json 文件和至少一个 schema(.proto) 文件。配置定义了服务映射的3个层次。
@@ -134,12 +200,13 @@ Protobuf 采用 proto3 格式,详细格式请参考 [proto3-spec](https://deve
 
 需要注意的是,REST 服务调用时参数将会解析为 json。其中,json 的键名来自于 proto 中的 message 定义的键名。Proto message 的键名在解析时会自动转化为小写驼峰格式。如果调用的 REST 服务参数不是这种格式,用户必须在 message 中指定 json_name 选项显式指定键名以防止自动转换。
 
-### 限制
+### 注意事项
 
 由于 REST 和 msgpack-rpc 并非原生采用 protobuf 定义,因此其使用有一些限制。
 
-REST 服务目前默认为 **POST**,且传输格式为 json。定义的protobuf 中:
-- 输入参数仅可以为 message 类型或者 *google.protobuf.StringValue* 类型。若输入参数为 *google.protobuf.StringValue*,则传入的参数必须为已编码的 json 字符串,例如 `"{\"name\":\"name1\",\"size\":1}"`。
+REST 服务目前默认为 **POST**,且传输格式为 json。用户可通过配置 proto 文件中的 [http 选项](#http选项) 来改变默认的 http 方法和 URL 等。REST 服务配置有如下限制:
+
+- 如果未指定 http 选项,输入参数仅可以为 message 类型或者 *google.protobuf.StringValue* 类型。若输入参数为 *google.protobuf.StringValue*,则传入的参数必须为已编码的 json 字符串,例如 `"{\"name\":\"name1\",\"size\":1}"`。
 
 msgpack-rpc 服务有以下限制:
 - 输入不能为空

+ 31 - 0
etc/services/schemas/google/api/annotations.proto

@@ -0,0 +1,31 @@
+// Copyright (c) 2015, Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package google.api;
+
+import "google/api/http.proto";
+import "google/protobuf/descriptor.proto";
+
+option go_package = "google.golang.org/genproto/googleapis/api/annotations;annotations";
+option java_multiple_files = true;
+option java_outer_classname = "AnnotationsProto";
+option java_package = "com.google.api";
+option objc_class_prefix = "GAPI";
+
+extend google.protobuf.MethodOptions {
+  // See `HttpRule`.
+  HttpRule http = 72295728;
+}

+ 376 - 0
etc/services/schemas/google/api/http.proto

@@ -0,0 +1,376 @@
+
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package google.api;
+
+option cc_enable_arenas = true;
+option go_package = "google.golang.org/genproto/googleapis/api/annotations;annotations";
+option java_multiple_files = true;
+option java_outer_classname = "HttpProto";
+option java_package = "com.google.api";
+option objc_class_prefix = "GAPI";
+
+// Defines the HTTP configuration for an API service. It contains a list of
+// [HttpRule][google.api.HttpRule], each specifying the mapping of an RPC method
+// to one or more HTTP REST API methods.
+message Http {
+  // A list of HTTP configuration rules that apply to individual API methods.
+  //
+  // **NOTE:** All service configuration rules follow "last one wins" order.
+  repeated HttpRule rules = 1;
+
+  // When set to true, URL path parameters will be fully URI-decoded except in
+  // cases of single segment matches in reserved expansion, where "%2F" will be
+  // left encoded.
+  //
+  // The default behavior is to not decode RFC 6570 reserved characters in multi
+  // segment matches.
+  bool fully_decode_reserved_expansion = 2;
+}
+
+// # gRPC Transcoding
+//
+// gRPC Transcoding is a feature for mapping between a gRPC method and one or
+// more HTTP REST endpoints. It allows developers to build a single API service
+// that supports both gRPC APIs and REST APIs. Many systems, including [Google
+// APIs](https://github.com/googleapis/googleapis),
+// [Cloud Endpoints](https://cloud.google.com/endpoints), [gRPC
+// Gateway](https://github.com/grpc-ecosystem/grpc-gateway),
+// and [Envoy](https://github.com/envoyproxy/envoy) proxy support this feature
+// and use it for large scale production services.
+//
+// `HttpRule` defines the schema of the gRPC/REST mapping. The mapping specifies
+// how different portions of the gRPC request message are mapped to the URL
+// path, URL query parameters, and HTTP request body. It also controls how the
+// gRPC response message is mapped to the HTTP response body. `HttpRule` is
+// typically specified as an `google.api.http` annotation on the gRPC method.
+//
+// Each mapping specifies a URL path template and an HTTP method. The path
+// template may refer to one or more fields in the gRPC request message, as long
+// as each field is a non-repeated field with a primitive (non-message) type.
+// The path template controls how fields of the request message are mapped to
+// the URL path.
+//
+// Example:
+//
+//     service Messaging {
+//       rpc GetMessage(GetMessageRequest) returns (Message) {
+//         option (google.api.http) = {
+//             get: "/v1/{name=messages/*}"
+//         };
+//       }
+//     }
+//     message GetMessageRequest {
+//       string name = 1; // Mapped to URL path.
+//     }
+//     message Message {
+//       string text = 1; // The resource content.
+//     }
+//
+// This enables an HTTP REST to gRPC mapping as below:
+//
+// HTTP | gRPC
+// -----|-----
+// `GET /v1/messages/123456`  | `GetMessage(name: "messages/123456")`
+//
+// Any fields in the request message which are not bound by the path template
+// automatically become HTTP query parameters if there is no HTTP request body.
+// For example:
+//
+//     service Messaging {
+//       rpc GetMessage(GetMessageRequest) returns (Message) {
+//         option (google.api.http) = {
+//             get:"/v1/messages/{message_id}"
+//         };
+//       }
+//     }
+//     message GetMessageRequest {
+//       message SubMessage {
+//         string subfield = 1;
+//       }
+//       string message_id = 1; // Mapped to URL path.
+//       int64 revision = 2;    // Mapped to URL query parameter `revision`.
+//       SubMessage sub = 3;    // Mapped to URL query parameter `sub.subfield`.
+//     }
+//
+// This enables a HTTP JSON to RPC mapping as below:
+//
+// HTTP | gRPC
+// -----|-----
+// `GET /v1/messages/123456?revision=2&sub.subfield=foo` |
+// `GetMessage(message_id: "123456" revision: 2 sub: SubMessage(subfield:
+// "foo"))`
+//
+// Note that fields which are mapped to URL query parameters must have a
+// primitive type or a repeated primitive type or a non-repeated message type.
+// In the case of a repeated type, the parameter can be repeated in the URL
+// as `...?param=A&param=B`. In the case of a message type, each field of the
+// message is mapped to a separate parameter, such as
+// `...?foo.a=A&foo.b=B&foo.c=C`.
+//
+// For HTTP methods that allow a request body, the `body` field
+// specifies the mapping. Consider a REST update method on the
+// message resource collection:
+//
+//     service Messaging {
+//       rpc UpdateMessage(UpdateMessageRequest) returns (Message) {
+//         option (google.api.http) = {
+//           patch: "/v1/messages/{message_id}"
+//           body: "message"
+//         };
+//       }
+//     }
+//     message UpdateMessageRequest {
+//       string message_id = 1; // mapped to the URL
+//       Message message = 2;   // mapped to the body
+//     }
+//
+// The following HTTP JSON to RPC mapping is enabled, where the
+// representation of the JSON in the request body is determined by
+// protos JSON encoding:
+//
+// HTTP | gRPC
+// -----|-----
+// `PATCH /v1/messages/123456 { "text": "Hi!" }` | `UpdateMessage(message_id:
+// "123456" message { text: "Hi!" })`
+//
+// The special name `*` can be used in the body mapping to define that
+// every field not bound by the path template should be mapped to the
+// request body.  This enables the following alternative definition of
+// the update method:
+//
+//     service Messaging {
+//       rpc UpdateMessage(Message) returns (Message) {
+//         option (google.api.http) = {
+//           patch: "/v1/messages/{message_id}"
+//           body: "*"
+//         };
+//       }
+//     }
+//     message Message {
+//       string message_id = 1;
+//       string text = 2;
+//     }
+//
+//
+// The following HTTP JSON to RPC mapping is enabled:
+//
+// HTTP | gRPC
+// -----|-----
+// `PATCH /v1/messages/123456 { "text": "Hi!" }` | `UpdateMessage(message_id:
+// "123456" text: "Hi!")`
+//
+// Note that when using `*` in the body mapping, it is not possible to
+// have HTTP parameters, as all fields not bound by the path end in
+// the body. This makes this option more rarely used in practice when
+// defining REST APIs. The common usage of `*` is in custom methods
+// which don't use the URL at all for transferring data.
+//
+// It is possible to define multiple HTTP methods for one RPC by using
+// the `additional_bindings` option. Example:
+//
+//     service Messaging {
+//       rpc GetMessage(GetMessageRequest) returns (Message) {
+//         option (google.api.http) = {
+//           get: "/v1/messages/{message_id}"
+//           additional_bindings {
+//             get: "/v1/users/{user_id}/messages/{message_id}"
+//           }
+//         };
+//       }
+//     }
+//     message GetMessageRequest {
+//       string message_id = 1;
+//       string user_id = 2;
+//     }
+//
+// This enables the following two alternative HTTP JSON to RPC mappings:
+//
+// HTTP | gRPC
+// -----|-----
+// `GET /v1/messages/123456` | `GetMessage(message_id: "123456")`
+// `GET /v1/users/me/messages/123456` | `GetMessage(user_id: "me" message_id:
+// "123456")`
+//
+// ## Rules for HTTP mapping
+//
+// 1. Leaf request fields (recursive expansion nested messages in the request
+//    message) are classified into three categories:
+//    - Fields referred by the path template. They are passed via the URL path.
+//    - Fields referred by the [HttpRule.body][google.api.HttpRule.body]. They are passed via the HTTP
+//      request body.
+//    - All other fields are passed via the URL query parameters, and the
+//      parameter name is the field path in the request message. A repeated
+//      field can be represented as multiple query parameters under the same
+//      name.
+//  2. If [HttpRule.body][google.api.HttpRule.body] is "*", there is no URL query parameter, all fields
+//     are passed via URL path and HTTP request body.
+//  3. If [HttpRule.body][google.api.HttpRule.body] is omitted, there is no HTTP request body, all
+//     fields are passed via URL path and URL query parameters.
+//
+// ### Path template syntax
+//
+//     Template = "/" Segments [ Verb ] ;
+//     Segments = Segment { "/" Segment } ;
+//     Segment  = "*" | "**" | LITERAL | Variable ;
+//     Variable = "{" FieldPath [ "=" Segments ] "}" ;
+//     FieldPath = IDENT { "." IDENT } ;
+//     Verb     = ":" LITERAL ;
+//
+// The syntax `*` matches a single URL path segment. The syntax `**` matches
+// zero or more URL path segments, which must be the last part of the URL path
+// except the `Verb`.
+//
+// The syntax `Variable` matches part of the URL path as specified by its
+// template. A variable template must not contain other variables. If a variable
+// matches a single path segment, its template may be omitted, e.g. `{var}`
+// is equivalent to `{var=*}`.
+//
+// The syntax `LITERAL` matches literal text in the URL path. If the `LITERAL`
+// contains any reserved character, such characters should be percent-encoded
+// before the matching.
+//
+// If a variable contains exactly one path segment, such as `"{var}"` or
+// `"{var=*}"`, when such a variable is expanded into a URL path on the client
+// side, all characters except `[-_.~0-9a-zA-Z]` are percent-encoded. The
+// server side does the reverse decoding. Such variables show up in the
+// [Discovery
+// Document](https://developers.google.com/discovery/v1/reference/apis) as
+// `{var}`.
+//
+// If a variable contains multiple path segments, such as `"{var=foo/*}"`
+// or `"{var=**}"`, when such a variable is expanded into a URL path on the
+// client side, all characters except `[-_.~/0-9a-zA-Z]` are percent-encoded.
+// The server side does the reverse decoding, except "%2F" and "%2f" are left
+// unchanged. Such variables show up in the
+// [Discovery
+// Document](https://developers.google.com/discovery/v1/reference/apis) as
+// `{+var}`.
+//
+// ## Using gRPC API Service Configuration
+//
+// gRPC API Service Configuration (service config) is a configuration language
+// for configuring a gRPC service to become a user-facing product. The
+// service config is simply the YAML representation of the `google.api.Service`
+// proto message.
+//
+// As an alternative to annotating your proto file, you can configure gRPC
+// transcoding in your service config YAML files. You do this by specifying a
+// `HttpRule` that maps the gRPC method to a REST endpoint, achieving the same
+// effect as the proto annotation. This can be particularly useful if you
+// have a proto that is reused in multiple services. Note that any transcoding
+// specified in the service config will override any matching transcoding
+// configuration in the proto.
+//
+// Example:
+//
+//     http:
+//       rules:
+//         # Selects a gRPC method and applies HttpRule to it.
+//         - selector: example.v1.Messaging.GetMessage
+//           get: /v1/messages/{message_id}/{sub.subfield}
+//
+// ## Special notes
+//
+// When gRPC Transcoding is used to map a gRPC to JSON REST endpoints, the
+// proto to JSON conversion must follow the [proto3
+// specification](https://developers.google.com/protocol-buffers/docs/proto3#json).
+//
+// While the single segment variable follows the semantics of
+// [RFC 6570](https://tools.ietf.org/html/rfc6570) Section 3.2.2 Simple String
+// Expansion, the multi segment variable **does not** follow RFC 6570 Section
+// 3.2.3 Reserved Expansion. The reason is that the Reserved Expansion
+// does not expand special characters like `?` and `#`, which would lead
+// to invalid URLs. As the result, gRPC Transcoding uses a custom encoding
+// for multi segment variables.
+//
+// The path variables **must not** refer to any repeated or mapped field,
+// because client libraries are not capable of handling such variable expansion.
+//
+// The path variables **must not** capture the leading "/" character. The reason
+// is that the most common use case "{var}" does not capture the leading "/"
+// character. For consistency, all path variables must share the same behavior.
+//
+// Repeated message fields must not be mapped to URL query parameters, because
+// no client library can support such complicated mapping.
+//
+// If an API needs to use a JSON array for request or response body, it can map
+// the request or response body to a repeated field. However, some gRPC
+// Transcoding implementations may not support this feature.
+message HttpRule {
+  // Selects a method to which this rule applies.
+  //
+  // Refer to [selector][google.api.DocumentationRule.selector] for syntax details.
+  string selector = 1;
+
+  // Determines the URL pattern is matched by this rules. This pattern can be
+  // used with any of the {get|put|post|delete|patch} methods. A custom method
+  // can be defined using the 'custom' field.
+  oneof pattern {
+    // Maps to HTTP GET. Used for listing and getting information about
+    // resources.
+    string get = 2;
+
+    // Maps to HTTP PUT. Used for replacing a resource.
+    string put = 3;
+
+    // Maps to HTTP POST. Used for creating a resource or performing an action.
+    string post = 4;
+
+    // Maps to HTTP DELETE. Used for deleting a resource.
+    string delete = 5;
+
+    // Maps to HTTP PATCH. Used for updating a resource.
+    string patch = 6;
+
+    // The custom pattern is used for specifying an HTTP method that is not
+    // included in the `pattern` field, such as HEAD, or "*" to leave the
+    // HTTP method unspecified for this rule. The wild-card rule is useful
+    // for services that provide content to Web (HTML) clients.
+    CustomHttpPattern custom = 8;
+  }
+
+  // The name of the request field whose value is mapped to the HTTP request
+  // body, or `*` for mapping all request fields not captured by the path
+  // pattern to the HTTP body, or omitted for not having any HTTP request body.
+  //
+  // NOTE: the referred field must be present at the top-level of the request
+  // message type.
+  string body = 7;
+
+  // Optional. The name of the response field whose value is mapped to the HTTP
+  // response body. When omitted, the entire response message will be used
+  // as the HTTP response body.
+  //
+  // NOTE: The referred field must be present at the top-level of the response
+  // message type.
+  string response_body = 12;
+
+  // Additional HTTP bindings for the selector. Nested bindings must
+  // not contain an `additional_bindings` field themselves (that is,
+  // the nesting may only be one level deep).
+  repeated HttpRule additional_bindings = 11;
+}
+
+// A custom pattern is used for defining custom HTTP verb.
+message CustomHttpPattern {
+  // The name of this custom HTTP verb.
+  string kind = 1;
+
+  // The path matched by this custom verb.
+  string path = 2;
+}

+ 7 - 5
services/executors.go

@@ -16,7 +16,6 @@ import (
 	"net/http"
 	"net/rpc"
 	"net/url"
-	"path"
 	"reflect"
 	"strings"
 	"sync"
@@ -183,13 +182,16 @@ func (h *httpExecutor) InvokeFunction(ctx api.FunctionContext, name string, para
 			Timeout:   time.Duration(h.timeout) * time.Millisecond}
 	}
 
-	json, err := h.descriptor.ConvertParamsToJson(name, params)
+	hm, err := h.descriptor.ConvertHttpMapping(name, params)
 	if err != nil {
 		return nil, err
 	}
-	u := *h.addr
-	u.Path = path.Join(u.Path, name)
-	resp, err := common.Send(ctx.GetLogger(), h.conn, "json", http.MethodPost, u.String(), h.restOpt.Headers, false, json)
+	u := h.addr.String() + hm.Uri
+	_, err = url.Parse(u)
+	if err != nil {
+		return nil, err
+	}
+	resp, err := common.Send(ctx.GetLogger(), h.conn, "json", hm.Method, u, h.restOpt.Headers, false, hm.Body)
 	if err != nil {
 		return nil, err
 	}

+ 270 - 51
services/external_service_rule_test.go

@@ -10,6 +10,7 @@ import (
 	"github.com/emqx/kuiper/xstream/topotest"
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/golang/protobuf/ptypes/wrappers"
+	"github.com/gorilla/mux"
 	"github.com/msgpack-rpc/msgpack-rpc-go/rpc"
 	"google.golang.org/grpc"
 	"io"
@@ -17,6 +18,7 @@ import (
 	"net/http"
 	"net/http/httptest"
 	"reflect"
+	"strconv"
 	"testing"
 )
 
@@ -58,6 +60,26 @@ type EncodedRequest struct {
 	Size int    `json:"size,omitempty"`
 }
 
+type ShelfMessage struct {
+	Id    string `json:"id,omitempty"`
+	Theme string `json:"theme,omitempty"`
+}
+
+type ShelfMessageOut struct {
+	Id    int64  `json:"id,omitempty"`
+	Theme string `json:"theme,omitempty"`
+}
+
+type BookMessage struct {
+	Id     int64  `json:"id,omitempty"`
+	Author string `json:"author,omitempty"`
+	Title  string `json:"title,omitempty"`
+}
+
+type MessageMessage struct {
+	Text string `json:"text,omitempty"`
+}
+
 func TestRestService(t *testing.T) {
 	// mock server, the port is set in the sample.json
 	l, err := net.Listen("tcp", "127.0.0.1:51234")
@@ -66,61 +88,109 @@ func TestRestService(t *testing.T) {
 		t.FailNow()
 	}
 	count := 0
-	server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-		path := r.URL.Path
+	router := mux.NewRouter()
+	router.HandleFunc("/SayHello", func(w http.ResponseWriter, r *http.Request) {
+		body := &HelloRequest{}
+		err := json.NewDecoder(r.Body).Decode(body)
+		if err != nil {
+			http.Error(w, err.Error(), http.StatusBadRequest)
+		}
+		out := &HelloReply{Message: body.Name}
+		jsonOut(w, err, out)
+	}).Methods(http.MethodPost)
+	router.HandleFunc("/object_detection", func(w http.ResponseWriter, r *http.Request) {
+		req := &ObjectDetectRequest{}
+		err := json.NewDecoder(r.Body).Decode(req)
+		if err != nil {
+			http.Error(w, err.Error(), http.StatusBadRequest)
+		}
+		if req.Image == "" {
+			http.Error(w, "image is not found", http.StatusBadRequest)
+		}
+		out := &ObjectDetectResponse{
+			Info:   req.Command,
+			Code:   200,
+			Image:  req.Image,
+			Result: req.Command + " success",
+			Type:   "S",
+		}
+		jsonOut(w, err, out)
+	}).Methods(http.MethodPost)
+	router.HandleFunc("/getStatus", func(w http.ResponseWriter, r *http.Request) {
+		result := count%2 == 0
+		count++
+		io.WriteString(w, fmt.Sprintf("%v", result))
+	}).Methods(http.MethodPost)
+	router.HandleFunc("/RestEncodedJson", func(w http.ResponseWriter, r *http.Request) {
+		req := &EncodedRequest{}
+		err := json.NewDecoder(r.Body).Decode(req)
+		if err != nil {
+			http.Error(w, err.Error(), http.StatusBadRequest)
+		}
+		io.WriteString(w, req.Name)
+	}).Methods(http.MethodPost)
+	router.HandleFunc("/bookshelf/v1/shelves", func(w http.ResponseWriter, r *http.Request) {
+		req := &ShelfMessage{}
+		err := json.NewDecoder(r.Body).Decode(req)
+		if err != nil {
+			http.Error(w, err.Error(), http.StatusBadRequest)
+		}
+		if req.Id == "" || req.Theme == "" {
+			http.Error(w, "empty request", http.StatusBadRequest)
+		}
+		idint, _ := strconv.Atoi(req.Id)
+		out := ShelfMessageOut{Id: int64(idint), Theme: req.Theme}
+		jsonOut(w, err, out)
+	}).Methods(http.MethodPost)
+	router.HandleFunc("/bookshelf/v1/shelves/{shelf}/books/{book}", func(w http.ResponseWriter, r *http.Request) {
 		defer r.Body.Close()
-		var (
-			out interface{}
-		)
-		switch path {
-		case "/SayHello":
-			body := &HelloRequest{}
-			err := json.NewDecoder(r.Body).Decode(body)
-			if err != nil {
-				http.Error(w, err.Error(), http.StatusBadRequest)
-			}
-			out = &HelloReply{Message: body.Name}
-		case "/object_detection":
-			req := &ObjectDetectRequest{}
-			err := json.NewDecoder(r.Body).Decode(req)
-			if err != nil {
-				http.Error(w, err.Error(), http.StatusBadRequest)
-			}
-			if req.Image == "" {
-				http.Error(w, "image is not found", http.StatusBadRequest)
-			}
-			out = &ObjectDetectResponse{
-				Info:   req.Command,
-				Code:   200,
-				Image:  req.Image,
-				Result: req.Command + " success",
-				Type:   "S",
-			}
-		case "/getStatus":
-			r := count%2 == 0
-			count++
-			io.WriteString(w, fmt.Sprintf("%v", r))
-			return
-		case "/RestEncodedJson":
-			req := &EncodedRequest{}
-			err := json.NewDecoder(r.Body).Decode(req)
-			if err != nil {
-				http.Error(w, err.Error(), http.StatusBadRequest)
-			}
-			io.WriteString(w, req.Name)
-			return
-		default:
-			http.Error(w, "path not supported", http.StatusBadRequest)
+		vars := mux.Vars(r)
+		shelf, book := vars["shelf"], vars["book"]
+		if shelf == "" || book == "" {
+			http.Error(w, "empty request", http.StatusBadRequest)
 		}
-
-		w.Header().Add("Content-Type", "application/json")
-		enc := json.NewEncoder(w)
-		err = enc.Encode(out)
-		// Problems encoding
+		idint, _ := strconv.Atoi(book)
+		out := BookMessage{Id: int64(idint), Author: "NA", Title: "title_" + book}
+		jsonOut(w, err, out)
+	}).Methods(http.MethodGet)
+	router.HandleFunc("/messaging/v1/messages/{name}", func(w http.ResponseWriter, r *http.Request) {
+		defer r.Body.Close()
+		vars := mux.Vars(r)
+		name := vars["name"]
+		if name == "" {
+			http.Error(w, "empty request", http.StatusBadRequest)
+		}
+		out := MessageMessage{Text: name + " content"}
+		jsonOut(w, err, out)
+	}).Methods(http.MethodGet)
+	router.HandleFunc("/messaging/v1/messages/filter/{name}", func(w http.ResponseWriter, r *http.Request) {
+		defer r.Body.Close()
+		vars := mux.Vars(r)
+		name := vars["name"]
+		q := r.URL.Query()
+		rev, sub := q.Get("revision"), q.Get("sub.subfield")
+		if name == "" || rev == "" || sub == "" {
+			http.Error(w, "empty request", http.StatusBadRequest)
+		}
+		out := MessageMessage{Text: name + rev + sub}
+		jsonOut(w, err, out)
+	}).Methods(http.MethodGet)
+	router.HandleFunc("/messaging/v1/messages/{name}", func(w http.ResponseWriter, r *http.Request) {
+		defer r.Body.Close()
+		vars := mux.Vars(r)
+		name := vars["name"]
+		if name == "" {
+			http.Error(w, "empty request", http.StatusBadRequest)
+		}
+		body := &MessageMessage{}
+		err := json.NewDecoder(r.Body).Decode(body)
 		if err != nil {
 			http.Error(w, err.Error(), http.StatusBadRequest)
 		}
-	}))
+		out := MessageMessage{Text: body.Text}
+		jsonOut(w, err, out)
+	}).Methods(http.MethodPut, http.MethodPatch)
+	server := httptest.NewUnstartedServer(router)
 	server.Listener.Close()
 	server.Listener = l
 
@@ -129,7 +199,7 @@ func TestRestService(t *testing.T) {
 
 	defer server.Close()
 	//Reset
-	streamList := []string{"helloStr", "commands", "fakeBin"}
+	streamList := []string{"helloStr", "commands", "fakeBin", "shelves", "demo", "mes"}
 	topotest.HandleStream(false, streamList, t)
 	//Data setup
 	var tests = []topotest.RuleTest{
@@ -286,6 +356,145 @@ func TestRestService(t *testing.T) {
 				"sink_mockSink_0_records_in_total":  int64(3),
 				"sink_mockSink_0_records_out_total": int64(3),
 			},
+		}, {
+			Name: `TestRestRule6`,
+			Sql:  `SELECT CreateShelf(shelf)->theme as theme FROM shelves`,
+			R: [][]map[string]interface{}{
+				{{
+					"theme": "tandra",
+				}},
+				{{
+					"theme": "claro",
+				}},
+				{{
+					"theme": "dark",
+				}},
+			},
+			M: map[string]interface{}{
+				"op_2_project_0_exceptions_total":   int64(0),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(3),
+				"op_2_project_0_records_out_total":  int64(3),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(3),
+				"sink_mockSink_0_records_out_total": int64(3),
+			},
+		}, {
+			Name: `TestRestRule7`,
+			Sql:  `SELECT GetBook(size, ts)->title as title FROM demo WHERE size > 3 `,
+			R: [][]map[string]interface{}{
+				{{
+					"title": "title_1541152486822",
+				}},
+				{{
+					"title": "title_1541152488442",
+				}},
+			},
+			M: map[string]interface{}{
+				"op_2_filter_0_exceptions_total":   int64(0),
+				"op_2_filter_0_process_latency_us": int64(0),
+				"op_2_filter_0_records_in_total":   int64(5),
+				"op_2_filter_0_records_out_total":  int64(2),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(2),
+				"sink_mockSink_0_records_out_total": int64(2),
+			},
+		}, {
+			Name: `TestRestRule8`,
+			Sql:  `SELECT GetMessage(concat("messages/",ts))->text as message FROM demo WHERE size > 3`,
+			R: [][]map[string]interface{}{
+				{{
+					"message": "1541152486822 content",
+				}},
+				{{
+					"message": "1541152488442 content",
+				}},
+			},
+			M: map[string]interface{}{
+				"op_2_filter_0_exceptions_total":   int64(0),
+				"op_2_filter_0_process_latency_us": int64(0),
+				"op_2_filter_0_records_in_total":   int64(5),
+				"op_2_filter_0_records_out_total":  int64(2),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(2),
+				"sink_mockSink_0_records_out_total": int64(2),
+			},
+		}, {
+			Name: `TestRestRule9`,
+			Sql:  `SELECT SearchMessage(name, size, shelf)->text as message FROM shelves`,
+			R: [][]map[string]interface{}{
+				{{
+					"message": "name12sub1",
+				}},
+				{{
+					"message": "name23sub2",
+				}},
+				{{
+					"message": "name34sub3",
+				}},
+			},
+			M: map[string]interface{}{
+				"op_2_project_0_exceptions_total":   int64(0),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(3),
+				"op_2_project_0_records_out_total":  int64(3),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(3),
+				"sink_mockSink_0_records_out_total": int64(3),
+			},
+			// TODO support * as one of the parameters
+			//},{
+			//	Name: `TestRestRule10`,
+			//	Sql:  `SELECT UpdateMessage(message_id, *)->text as message FROM mes`,
+			//	R: [][]map[string]interface{}{
+			//		{{
+			//			"message": "message1",
+			//		}},
+			//		{{
+			//			"message": "message2",
+			//		}},
+			//		{{
+			//			"message": "message3",
+			//		}},
+			//	},
+			//	M: map[string]interface{}{
+			//		"op_2_project_0_exceptions_total":   int64(0),
+			//		"op_2_project_0_process_latency_us": int64(0),
+			//		"op_2_project_0_records_in_total":   int64(3),
+			//		"op_2_project_0_records_out_total":  int64(3),
+			//
+			//		"sink_mockSink_0_exceptions_total":  int64(0),
+			//		"sink_mockSink_0_records_in_total":  int64(3),
+			//		"sink_mockSink_0_records_out_total": int64(3),
+			//	},
+		}, {
+			Name: `TestRestRule11`,
+			Sql:  `SELECT PatchMessage(message_id, text)->text as message FROM mes`,
+			R: [][]map[string]interface{}{
+				{{
+					"message": "message1",
+				}},
+				{{
+					"message": "message2",
+				}},
+				{{
+					"message": "message3",
+				}},
+			},
+			M: map[string]interface{}{
+				"op_2_project_0_exceptions_total":   int64(0),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(3),
+				"op_2_project_0_records_out_total":  int64(3),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(3),
+				"sink_mockSink_0_records_out_total": int64(3),
+			},
 		},
 	}
 	topotest.HandleStream(true, streamList, t)
@@ -295,6 +504,16 @@ func TestRestService(t *testing.T) {
 	}, 0)
 }
 
+func jsonOut(w http.ResponseWriter, err error, out interface{}) {
+	w.Header().Add("Content-Type", "application/json")
+	enc := json.NewEncoder(w)
+	err = enc.Encode(out)
+	// Problems encoding
+	if err != nil {
+		http.Error(w, err.Error(), http.StatusBadRequest)
+	}
+}
+
 type Resolver map[string]reflect.Value
 
 func (self Resolver) Resolve(name string, _ []reflect.Value) (reflect.Value, error) {

+ 60 - 0
services/manager_test.go

@@ -92,6 +92,66 @@ func TestInitByFiles(t *testing.T) {
 		},
 	}
 	funcs := map[string]*functionContainer{
+		"ListShelves": {
+			ServiceName:   "httpSample",
+			InterfaceName: "bookshelf",
+			MethodName:    "ListShelves",
+		},
+		"CreateShelf": {
+			ServiceName:   "httpSample",
+			InterfaceName: "bookshelf",
+			MethodName:    "CreateShelf",
+		},
+		"GetShelf": {
+			ServiceName:   "httpSample",
+			InterfaceName: "bookshelf",
+			MethodName:    "GetShelf",
+		},
+		"DeleteShelf": {
+			ServiceName:   "httpSample",
+			InterfaceName: "bookshelf",
+			MethodName:    "DeleteShelf",
+		},
+		"ListBooks": {
+			ServiceName:   "httpSample",
+			InterfaceName: "bookshelf",
+			MethodName:    "ListBooks",
+		},
+		"createBook": {
+			ServiceName:   "httpSample",
+			InterfaceName: "bookshelf",
+			MethodName:    "CreateBook",
+		},
+		"GetBook": {
+			ServiceName:   "httpSample",
+			InterfaceName: "bookshelf",
+			MethodName:    "GetBook",
+		},
+		"DeleteBook": {
+			ServiceName:   "httpSample",
+			InterfaceName: "bookshelf",
+			MethodName:    "DeleteBook",
+		},
+		"GetMessage": {
+			ServiceName:   "httpSample",
+			InterfaceName: "messaging",
+			MethodName:    "GetMessage",
+		},
+		"SearchMessage": {
+			ServiceName:   "httpSample",
+			InterfaceName: "messaging",
+			MethodName:    "SearchMessage",
+		},
+		"UpdateMessage": {
+			ServiceName:   "httpSample",
+			InterfaceName: "messaging",
+			MethodName:    "UpdateMessage",
+		},
+		"PatchMessage": {
+			ServiceName:   "httpSample",
+			InterfaceName: "messaging",
+			MethodName:    "PatchMessage",
+		},
 		"helloFromGrpc": {
 			ServiceName:   "sample",
 			InterfaceName: "tsrpc",

+ 26 - 21
services/schema.go

@@ -68,6 +68,7 @@ type multiplexDescriptor interface {
 	jsonDescriptor
 	textDescriptor
 	interfaceDescriptor
+	httpMapping
 }
 
 var ( //Do not call these directly, use the get methods
@@ -103,9 +104,12 @@ func parse(schema schema, file string) (descriptor, error) {
 		} else {
 			result := &wrappedProtoDescriptor{
 				FileDescriptor: fds[0],
-				methods:        make(map[string]*desc.MethodDescriptor),
 				mf:             dynamic.NewMessageFactoryWithDefaults(),
 			}
+			err := result.parseHttpOptions()
+			if err != nil {
+				return nil, err
+			}
 			reg.Store(info, result)
 			return result, nil
 		}
@@ -116,8 +120,8 @@ func parse(schema schema, file string) (descriptor, error) {
 
 type wrappedProtoDescriptor struct {
 	*desc.FileDescriptor
-	methods map[string]*desc.MethodDescriptor
-	mf      *dynamic.MessageFactory
+	methodOptions map[string]*httpOptions
+	mf            *dynamic.MessageFactory
 }
 
 //TODO support for duplicate names
@@ -144,7 +148,7 @@ func (d *wrappedProtoDescriptor) ConvertParams(method string, params []interface
 		return nil, fmt.Errorf("can't find method %s in proto", method)
 	}
 	im := m.GetInputType()
-	return convertParams(im, params)
+	return d.convertParams(im, params)
 }
 
 func (d *wrappedProtoDescriptor) ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error) {
@@ -154,7 +158,7 @@ func (d *wrappedProtoDescriptor) ConvertParamsToMessage(method string, params []
 	}
 	im := m.GetInputType()
 	message := d.mf.NewDynamicMessage(im)
-	typedParams, err := convertParams(im, params)
+	typedParams, err := d.convertParams(im, params)
 	if err != nil {
 		return nil, err
 	}
@@ -196,7 +200,7 @@ func (d *wrappedProtoDescriptor) ConvertParamsToText(method string, params []int
 	}
 }
 
-func convertParams(im *desc.MessageDescriptor, params []interface{}) ([]interface{}, error) {
+func (d *wrappedProtoDescriptor) convertParams(im *desc.MessageDescriptor, params []interface{}) ([]interface{}, error) {
 	fields := im.GetFields()
 	var result []interface{}
 	switch len(params) {
@@ -209,14 +213,14 @@ func convertParams(im *desc.MessageDescriptor, params []interface{}) ([]interfac
 	case 1:
 		// If it is map, try unfold it
 		// TODO custom error for non map or map name not match
-		if r, err := unfoldMap(im, params[0]); err != nil {
+		if r, err := d.unfoldMap(im, params[0]); err != nil {
 			common.Log.Debugf("try unfold param for message %s fail: %v", im.GetName(), err)
 		} else {
 			return r, nil
 		}
 		// For non map params, treat it as special case of multiple params
 		if len(fields) == 1 {
-			param0, err := encodeField(fields[0], params[0])
+			param0, err := d.encodeField(fields[0], params[0])
 			if err != nil {
 				return nil, err
 			}
@@ -227,7 +231,7 @@ func convertParams(im *desc.MessageDescriptor, params []interface{}) ([]interfac
 	default:
 		if len(fields) == len(params) {
 			for i, field := range fields {
-				param, err := encodeField(field, params[i])
+				param, err := d.encodeField(field, params[i])
 				if err != nil {
 					return nil, err
 				}
@@ -290,7 +294,7 @@ func (d *wrappedProtoDescriptor) MethodDescriptor(name string) *desc.MethodDescr
 	return m
 }
 
-func unfoldMap(ft *desc.MessageDescriptor, i interface{}) ([]interface{}, error) {
+func (d *wrappedProtoDescriptor) unfoldMap(ft *desc.MessageDescriptor, i interface{}) ([]interface{}, error) {
 	fields := ft.GetFields()
 	result := make([]interface{}, len(fields))
 	if m, ok := xsql.ToMessage(i); ok {
@@ -299,7 +303,7 @@ func unfoldMap(ft *desc.MessageDescriptor, i interface{}) ([]interface{}, error)
 			if !ok {
 				return nil, fmt.Errorf("field %s not found", field.GetName())
 			}
-			fv, err := encodeField(field, v)
+			fv, err := d.encodeField(field, v)
 			if err != nil {
 				return nil, err
 			}
@@ -311,25 +315,26 @@ func unfoldMap(ft *desc.MessageDescriptor, i interface{}) ([]interface{}, error)
 	return result, nil
 }
 
-func encodeMap(fields []*desc.FieldDescriptor, i interface{}) (map[string]interface{}, error) {
-	var result map[string]interface{}
-	if m, ok := i.(map[string]interface{}); ok && len(m) == len(fields) {
+func (d *wrappedProtoDescriptor) encodeMap(im *desc.MessageDescriptor, i interface{}) (*dynamic.Message, error) {
+	result := d.mf.NewDynamicMessage(im)
+	fields := im.GetFields()
+	if m, ok := i.(map[string]interface{}); ok {
 		for _, field := range fields {
 			v, ok := m[field.GetName()]
 			if !ok {
 				return nil, fmt.Errorf("field %s not found", field.GetName())
 			}
-			fv, err := encodeField(field, v)
+			fv, err := d.encodeField(field, v)
 			if err != nil {
 				return nil, err
 			}
-			result[field.GetName()] = fv
+			result.SetFieldByName(field.GetName(), fv)
 		}
 	}
 	return result, nil
 }
 
-func encodeField(field *desc.FieldDescriptor, v interface{}) (interface{}, error) {
+func (d *wrappedProtoDescriptor) encodeField(field *desc.FieldDescriptor, v interface{}) (interface{}, error) {
 	fn := field.GetName()
 	ft := field.GetType()
 	if field.IsRepeated() {
@@ -381,7 +386,7 @@ func encodeField(field *desc.FieldDescriptor, v interface{}) (interface{}, error
 			result, err = common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
 				r, err := common.ToStringMap(v)
 				if err == nil {
-					return encodeMap(field.GetMessageType().GetFields(), r)
+					return d.encodeMap(field.GetMessageType(), r)
 				} else {
 					return nil, fmt.Errorf("invalid type for map type field '%s': %v", fn, err)
 				}
@@ -394,11 +399,11 @@ func encodeField(field *desc.FieldDescriptor, v interface{}) (interface{}, error
 		}
 		return result, err
 	} else {
-		return encodeSingleField(field, v)
+		return d.encodeSingleField(field, v)
 	}
 }
 
-func encodeSingleField(field *desc.FieldDescriptor, v interface{}) (interface{}, error) {
+func (d *wrappedProtoDescriptor) encodeSingleField(field *desc.FieldDescriptor, v interface{}) (interface{}, error) {
 	fn := field.GetName()
 	switch field.GetType() {
 	case dpb.FieldDescriptorProto_TYPE_DOUBLE:
@@ -467,7 +472,7 @@ func encodeSingleField(field *desc.FieldDescriptor, v interface{}) (interface{},
 	case dpb.FieldDescriptorProto_TYPE_MESSAGE:
 		r, err := common.ToStringMap(v)
 		if err == nil {
-			return encodeMap(field.GetMessageType().GetFields(), r)
+			return d.encodeMap(field.GetMessageType(), r)
 		} else {
 			return nil, fmt.Errorf("invalid type for map type field '%s': %v", fn, err)
 		}

+ 283 - 0
services/schema_http.go

@@ -0,0 +1,283 @@
+package services
+
+import (
+	"fmt"
+	"github.com/emqx/kuiper/common"
+	dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
+	"github.com/jhump/protoreflect/desc"
+	"github.com/jhump/protoreflect/dynamic"
+	"google.golang.org/protobuf/reflect/protoreflect"
+	"net/http"
+	"regexp"
+	"strings"
+)
+
+type httpConnMeta struct {
+	Method string
+	Uri    string // The Uri is a relative path which must start with /
+	Body   []byte
+}
+
+type httpMapping interface {
+	ConvertHttpMapping(method string, params []interface{}) (*httpConnMeta, error)
+}
+
+const (
+	httpAPI      = "google.api.http"
+	wildcardBody = "*"
+	emptyBody    = ""
+)
+
+type httpOptions struct {
+	Method      string
+	UriTemplate *uriTempalte // must not nil
+	BodyField   string
+}
+
+type uriTempalte struct {
+	Template string
+	Fields   []*field
+}
+
+type field struct {
+	name   string
+	prefix string
+}
+
+func (d *wrappedProtoDescriptor) parseHttpOptions() error {
+	optionsMap := make(map[string]*httpOptions)
+	var err error
+	for _, s := range d.GetServices() {
+		for _, m := range s.GetMethods() {
+			options := m.GetMethodOptions()
+			var ho *httpOptions
+			// Find http option and exit loop at once. If not found, http option is nil
+			options.ProtoReflect().Range(func(d protoreflect.FieldDescriptor, v protoreflect.Value) bool {
+				if d.FullName() == httpAPI {
+					if d.Kind() == protoreflect.MessageKind {
+						var (
+							uriOpt  string
+							bodyOpt string
+							err     error
+						)
+						ho = &httpOptions{}
+						v.Message().Range(func(din protoreflect.FieldDescriptor, vin protoreflect.Value) bool {
+							switch din.Name() {
+							case "get":
+								ho.Method = http.MethodGet
+								uriOpt, err = getUriOpt(din, vin)
+							case "put":
+								ho.Method = http.MethodPut
+								uriOpt, err = getUriOpt(din, vin)
+							case "delete":
+								ho.Method = http.MethodDelete
+								uriOpt, err = getUriOpt(din, vin)
+							case "post":
+								ho.Method = http.MethodPost
+								uriOpt, err = getUriOpt(din, vin)
+							case "patch":
+								ho.Method = http.MethodPatch
+								uriOpt, err = getUriOpt(din, vin)
+							case "body":
+								bodyOpt, err = getUriOpt(din, vin)
+							default:
+								err = fmt.Errorf("unsupported option %s", din.Name())
+							}
+							if err != nil {
+								return false
+							}
+							return true
+						})
+						if err != nil {
+							return false
+						}
+						err = ho.convertUri(m, uriOpt, bodyOpt)
+						if err != nil {
+							return false
+						}
+					} else {
+						err = fmt.Errorf("invalid http option for method %s in proto", m.GetName())
+					}
+					return false
+				}
+				if err != nil {
+					return false
+				}
+				return true
+			})
+			if err != nil {
+				return err
+			}
+			if ho != nil {
+				optionsMap[m.GetName()] = ho
+			}
+		}
+	}
+	d.methodOptions = optionsMap
+	return err
+}
+
+func (d *wrappedProtoDescriptor) ConvertHttpMapping(method string, params []interface{}) (*httpConnMeta, error) {
+	hcm := &httpConnMeta{}
+	var (
+		json []byte
+		err  error
+	)
+	if ho, ok := d.methodOptions[method]; ok {
+		message, err := d.ConvertParamsToMessage(method, params)
+		if err != nil {
+			return nil, err
+		}
+		if len(ho.UriTemplate.Fields) > 0 {
+			args := make([]interface{}, len(ho.UriTemplate.Fields))
+			for i, v := range ho.UriTemplate.Fields {
+				fv, err := getMessageFieldWithDots(message, v.name)
+				if err != nil {
+					return nil, err
+				}
+				args[i], err = common.ToString(fv, common.CONVERT_ALL)
+				if err != nil {
+					return nil, fmt.Errorf("invalid field %s(%v) as http option, must be string", v.name, fv)
+				}
+				// Remove all params to be used in the params, the left params are for BODY
+				level1Names := strings.Split(v.name, ".")
+				message.ClearFieldByName(level1Names[0])
+				if v.prefix != "" {
+					if strings.HasPrefix(args[i].(string), v.prefix) {
+						continue
+					} else {
+						return nil, fmt.Errorf("invalid field %s(%s) as http option, must have prefix %s", v.name, args[i], v.prefix)
+					}
+				}
+			}
+			hcm.Uri = fmt.Sprintf(ho.UriTemplate.Template, args...)
+		} else {
+			hcm.Uri = ho.UriTemplate.Template
+		}
+		hcm.Method = ho.Method
+		switch ho.BodyField {
+		case wildcardBody:
+			json, err = message.MarshalJSON()
+		case emptyBody:
+			json = nil
+		default:
+			bodyMessage := message.GetFieldByName(ho.BodyField)
+			if bm, ok := bodyMessage.(*dynamic.Message); ok {
+				json, err = bm.MarshalJSON()
+			} else {
+				return nil, fmt.Errorf("invalid body field %s, must be a message", ho.BodyField)
+			}
+		}
+	} else { // If options are not set, use the default setting
+		hcm.Method = "POST"
+		hcm.Uri = "/" + method
+		json, err = d.ConvertParamsToJson(method, params)
+	}
+	if err != nil {
+		return nil, err
+	}
+	hcm.Body = json
+	return hcm, nil
+}
+
+func getMessageFieldWithDots(message *dynamic.Message, name string) (interface{}, error) {
+	secs := strings.Split(name, ".")
+	currentMessage := message
+	for i, sec := range secs {
+		if i == len(secs)-1 {
+			return currentMessage.GetFieldByName(sec), nil
+		} else {
+			c := currentMessage.GetFieldByName(sec)
+			if cm, ok := c.(*dynamic.Message); ok {
+				currentMessage = cm
+			} else {
+				return nil, fmt.Errorf("fail to find field %s", name)
+			}
+		}
+	}
+	return nil, fmt.Errorf("fail to find field %s", name)
+}
+
+func getUriOpt(d protoreflect.FieldDescriptor, v protoreflect.Value) (string, error) {
+	if d.Kind() != protoreflect.StringKind {
+		return "", fmt.Errorf("invalid type for %s option, string required", d.Name())
+	}
+	return v.String(), nil
+}
+
+func (ho *httpOptions) convertUri(md *desc.MethodDescriptor, uriOpt string, bodyOpt string) error {
+	fmap := make(map[string]bool) // the value represents if the key is still available (not used) so that they can be removed from *
+	for _, f := range md.GetInputType().GetFields() {
+		fmap[f.GetName()] = true
+	}
+	result := &uriTempalte{}
+	re := regexp.MustCompile(`\{(.*?)\}`)
+	m := re.FindAllStringSubmatch(uriOpt, -1)
+	if len(m) > 0 {
+		result.Template = re.ReplaceAllString(uriOpt, "%s")
+		var fields []*field
+		for _, e := range m {
+			f := &field{}
+			rr := strings.Split(e[1], "=")
+			if len(rr) == 2 {
+				if strings.HasSuffix(rr[1], "*") {
+					f.name = rr[0]
+					f.prefix = rr[1][:len(rr[1])-1]
+				} else {
+					return fmt.Errorf("invalid uri %s in http option", uriOpt)
+				}
+			} else if len(rr) == 1 {
+				f.name = e[1]
+			} else {
+				return fmt.Errorf("invalid uri %s in http option", uriOpt)
+			}
+			_, ok := fmap[f.name]
+			if !ok {
+				return fmt.Errorf("invalid uri %s in http option, %s field not found", uriOpt, f.name)
+			}
+			fmap[f.name] = false
+			fields = append(fields, f)
+		}
+		result.Fields = fields
+	} else {
+		result.Template = uriOpt
+	}
+	switch bodyOpt {
+	case wildcardBody:
+		ho.BodyField = bodyOpt
+	default:
+		if bodyOpt != emptyBody {
+			if _, ok := fmap[bodyOpt]; !ok {
+				return fmt.Errorf("invalid body %s, field not found", bodyOpt)
+			} else {
+				fmap[bodyOpt] = false
+			}
+		}
+		ho.BodyField = bodyOpt
+		paramAdded := false
+		result.updateUriParams(md.GetInputType(), "", fmap, paramAdded)
+	}
+	ho.UriTemplate = result
+	return nil
+}
+
+func (u *uriTempalte) updateUriParams(md *desc.MessageDescriptor, prefix string, fmap map[string]bool, paramAdded bool) bool {
+	var jointer string
+	for _, mf := range md.GetFields() {
+		if fmap[mf.GetName()] || prefix != "" { // The first level field which are not consumed or the second level field
+			if mf.GetType() == dpb.FieldDescriptorProto_TYPE_MESSAGE {
+				paramAdded = u.updateUriParams(mf.GetMessageType(), prefix+mf.GetName()+".", fmap, paramAdded)
+				continue
+			}
+			if !paramAdded {
+				paramAdded = true
+				jointer = "?"
+			} else {
+				jointer = "&"
+			}
+			u.Template = fmt.Sprintf("%s%s%s%s=%s", u.Template, jointer, prefix, mf.GetName(), "%s")
+			u.Fields = append(u.Fields, &field{name: prefix + mf.GetName()})
+		}
+	}
+	return paramAdded
+}

+ 140 - 0
services/schema_http_test.go

@@ -0,0 +1,140 @@
+package services
+
+import (
+	"github.com/emqx/kuiper/common"
+	"net/http"
+	"reflect"
+	"testing"
+)
+
+func TestBookstoreConvertHttpMapping(t *testing.T) {
+	tests := []struct {
+		method string
+		params []interface{}
+		result *httpConnMeta
+		err    string
+	}{
+		{ // 0 create book
+			method: "CreateBook",
+			params: []interface{}{
+				1984,
+				map[string]interface{}{
+					"id":     20210519,
+					"author": "Conan Doyle",
+					"title":  "Sherlock Holmes",
+				},
+			},
+			// int64 will be marshaled to string!
+			result: &httpConnMeta{
+				Method: http.MethodPost,
+				Uri:    "/v1/shelves/1984/books",
+				Body:   []byte(`{"id":"20210519","author":"Conan Doyle","title":"Sherlock Holmes"}`),
+			},
+		}, { // 2 delete book
+			method: "DeleteBook",
+			params: []interface{}{
+				1984,
+				20210519,
+			},
+			result: &httpConnMeta{
+				Method: http.MethodDelete,
+				Uri:    "/v1/shelves/1984/books/20210519",
+			},
+		}, { // 3 list shelves
+			method: "ListShelves",
+			params: []interface{}{},
+			result: &httpConnMeta{
+				Method: http.MethodGet,
+				Uri:    "/v1/shelves",
+			},
+		},
+	}
+	d, err := parse(PROTOBUFF, "http_bookstore.proto")
+	if err != nil {
+		panic(err)
+	}
+	for i, tt := range tests {
+		r, err := d.(httpMapping).ConvertHttpMapping(tt.method, tt.params)
+		if !reflect.DeepEqual(tt.err, common.Errstring(err)) {
+			t.Errorf("%d : interface error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.err, err)
+		} else if tt.err == "" && !reflect.DeepEqual(tt.result, r) {
+			t.Errorf("%d \n\ninterface result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, r)
+		}
+	}
+}
+
+func TestMessagingConvertHttpMapping(t *testing.T) {
+	tests := []struct {
+		method string
+		params []interface{}
+		result *httpConnMeta
+		err    string
+	}{
+		{ // 0 get message
+			method: "GetMessage",
+			params: []interface{}{
+				"messages/123456",
+			},
+			// int64 will be marshaled to string!
+			result: &httpConnMeta{
+				Method: http.MethodGet,
+				Uri:    "/v1/messages/123456",
+			},
+		}, { // 1 get message prefix error
+			method: "GetMessage",
+			params: []interface{}{
+				"message/123456",
+			},
+			err: "invalid field name(message/123456) as http option, must have prefix messages/",
+		}, { // 2 search messages
+			method: "SearchMessage",
+			params: []interface{}{
+				"123456",
+				2,
+				map[string]interface{}{
+					"subfield": "foo",
+				},
+			},
+			result: &httpConnMeta{
+				Method: http.MethodGet,
+				Uri:    "/v1/messages/filter/123456?revision=2&sub.subfield=foo",
+			},
+		}, { // 3 update message
+			method: "UpdateMessage",
+			params: []interface{}{
+				"123456",
+				map[string]interface{}{
+					"text": "Hi!",
+				},
+			},
+			result: &httpConnMeta{
+				Method: http.MethodPut,
+				Uri:    "/v1/messages/123456",
+				Body:   []byte(`{"text":"Hi!"}`),
+			},
+		}, { // 4 patch message
+			method: "PatchMessage",
+			params: []interface{}{
+				"123456",
+				"Hi!",
+			},
+			result: &httpConnMeta{
+				Method: http.MethodPatch,
+				Uri:    "/v1/messages/123456",
+				Body:   []byte(`{"text":"Hi!"}`),
+			},
+		},
+	}
+	d, err := parse(PROTOBUFF, "http_messaging.proto")
+	if err != nil {
+		panic(err)
+	}
+	for i, tt := range tests {
+		r, err := d.(httpMapping).ConvertHttpMapping(tt.method, tt.params)
+		if !reflect.DeepEqual(tt.err, common.Errstring(err)) {
+			t.Errorf("%d : interface error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.err, err)
+		} else if tt.err == "" && !reflect.DeepEqual(tt.result, r) {
+			t.Errorf("%d \n\ninterface result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, r)
+		}
+	}
+}

+ 1 - 1
services/schema_test.go

@@ -225,7 +225,7 @@ func TestConvertReturns(t *testing.T) {
 	}
 
 	for i, descriptor := range descriptors {
-		for j, tt := range tests[1:2] {
+		for j, tt := range tests {
 			r, err := descriptor.(interfaceDescriptor).ConvertReturn(tt.method, tt.ireturn)
 			if !reflect.DeepEqual(tt.ierr, common.Errstring(err)) {
 				t.Errorf("%d.%d : interface error mismatch:\n  exp=%s\n  got=%s\n\n", i, j, tt.ierr, err)

+ 44 - 0
services/test/httpSample.json

@@ -0,0 +1,44 @@
+{
+  "about": {
+    "author": {
+      "name": "EMQ",
+      "email": "contact@emqx.io",
+      "company": "EMQ Technologies Co., Ltd",
+      "website": "https://www.emqx.io"
+    },
+    "helpUrl": {
+      "en_US": "https://github.com/emqx/kuiper/blob/master/docs/en_US/plugins/functions/functions.md",
+      "zh_CN": "https://github.com/emqx/kuiper/blob/master/docs/zh_CN/plugins/functions/functions.md"
+    },
+    "description": {
+      "en_US": "Sample external services for http options",
+      "zh_CN": "示例外部函数配置,用于测试http选项"
+    }
+  },
+  "interfaces": {
+    "bookshelf": {
+      "address": "http://localhost:51234/bookshelf",
+      "protocol": "rest",
+      "options": {
+        "insecureSkipVerify": true,
+        "headers": {
+          "Accept-Charset": "utf-8"
+        }
+      },
+      "schemaType": "protobuf",
+      "schemaFile": "http_bookstore.proto",
+      "functions": [
+        {
+          "name": "createBook",
+          "serviceName": "CreateBook"
+        }
+      ]
+    },
+    "messaging": {
+      "address": "http://localhost:51234/messaging",
+      "protocol": "rest",
+      "schemaType": "protobuf",
+      "schemaFile": "http_messaging.proto"
+    }
+  }
+}

+ 166 - 0
services/test/schemas/http_bookstore.proto

@@ -0,0 +1,166 @@
+// Copyright 2016 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+////////////////////////////////////////////////////////////////////////////////
+
+syntax = "proto3";
+
+package endpoints.examples.bookstore;
+
+option java_multiple_files = true;
+option java_outer_classname = "BookstoreProto";
+option java_package = "com.google.endpoints.examples.bookstore";
+
+
+import "google/api/annotations.proto";
+import "google/protobuf/empty.proto";
+
+// A simple Bookstore API.
+//
+// The API manages shelves and books resources. Shelves contain books.
+service Bookstore {
+  // Returns a list of all shelves in the bookstore.
+  rpc ListShelves(google.protobuf.Empty) returns (ListShelvesResponse) {
+    // Define HTTP mapping.
+    // Client example (Assuming your service is hosted at the given 'DOMAIN_NAME'):
+    //   curl http://DOMAIN_NAME/v1/shelves
+    option (google.api.http) = { get: "/v1/shelves" };
+  }
+  // Creates a new shelf in the bookstore.
+  rpc CreateShelf(CreateShelfRequest) returns (Shelf) {
+    // Client example:
+    //   curl -d '{"theme":"Music"}' http://DOMAIN_NAME/v1/shelves
+    option (google.api.http) = {
+      post: "/v1/shelves"
+      body: "shelf"
+    };
+  }
+  // Returns a specific bookstore shelf.
+  rpc GetShelf(GetShelfRequest) returns (Shelf) {
+    // Client example - returns the first shelf:
+    //   curl http://DOMAIN_NAME/v1/shelves/1
+    option (google.api.http) = { get: "/v1/shelves/{shelf}" };
+  }
+  // Deletes a shelf, including all books that are stored on the shelf.
+  rpc DeleteShelf(DeleteShelfRequest) returns (google.protobuf.Empty) {
+    // Client example - deletes the second shelf:
+    //   curl -X DELETE http://DOMAIN_NAME/v1/shelves/2
+    option (google.api.http) = { delete: "/v1/shelves/{shelf}" };
+  }
+  // Returns a list of books on a shelf.
+  rpc ListBooks(ListBooksRequest) returns (ListBooksResponse) {
+    // Client example - list the books from the first shelf:
+    //   curl http://DOMAIN_NAME/v1/shelves/1/books
+    option (google.api.http) = { get: "/v1/shelves/{shelf}/books" };
+  }
+  // Creates a new book.
+  rpc CreateBook(CreateBookRequest) returns (Book) {
+    // Client example - create a new book in the first shelf:
+    //   curl -d '{"author":"foo","title":"bar"}' http://DOMAIN_NAME/v1/shelves/1/books
+    option (google.api.http) = {
+      post: "/v1/shelves/{shelf}/books"
+      body: "book"
+    };
+  }
+  // Returns a specific book.
+  rpc GetBook(GetBookRequest) returns (Book) {
+    // Client example - get the first book from the second shelf:
+    //   curl http://DOMAIN_NAME/v1/shelves/2/books/1
+    option (google.api.http) = { get: "/v1/shelves/{shelf}/books/{book}" };
+  }
+  // Deletes a book from a shelf.
+  rpc DeleteBook(DeleteBookRequest) returns (google.protobuf.Empty) {
+    // Client example - delete the first book from the first shelf:
+    //   curl -X DELETE http://DOMAIN_NAME/v1/shelves/1/books/1
+    option (google.api.http) = { delete: "/v1/shelves/{shelf}/books/{book}" };
+  }
+}
+
+// A shelf resource.
+message Shelf {
+  // A unique shelf id.
+  int64 id = 1;
+  // A theme of the shelf (fiction, poetry, etc).
+  string theme = 2;
+}
+
+// A book resource.
+message Book {
+  // A unique book id.
+  int64 id = 1;
+  // An author of the book.
+  string author = 2;
+  // A book title.
+  string title = 3;
+}
+
+// Response to ListShelves call.
+message ListShelvesResponse {
+  // Shelves in the bookstore.
+  repeated Shelf shelves = 1;
+}
+
+// Request message for CreateShelf method.
+message CreateShelfRequest {
+  // The shelf resource to create.
+  Shelf shelf = 1;
+}
+
+// Request message for GetShelf method.
+message GetShelfRequest {
+  // The ID of the shelf resource to retrieve.
+  int64 shelf = 1;
+}
+
+// Request message for DeleteShelf method.
+message DeleteShelfRequest {
+  // The ID of the shelf to delete.
+  int64 shelf = 1;
+}
+
+// Request message for ListBooks method.
+message ListBooksRequest {
+  // ID of the shelf which books to list.
+  int64 shelf = 1;
+}
+
+// Response message to ListBooks method.
+message ListBooksResponse {
+  // The books on the shelf.
+  repeated Book books = 1;
+}
+
+// Request message for CreateBook method.
+message CreateBookRequest {
+  // The ID of the shelf on which to create a book.
+  int64 shelf = 1;
+  // A book resource to create on the shelf.
+  Book book = 2;
+}
+
+// Request message for GetBook method.
+message GetBookRequest {
+  // The ID of the shelf from which to retrieve a book.
+  int64 shelf = 1;
+  // The ID of the book to retrieve.
+  int64 book = 2;
+}
+
+// Request message for DeleteBook method.
+message DeleteBookRequest {
+  // The ID of the shelf from which to delete a book.
+  int64 shelf = 1;
+  // The ID of the book to delete.
+  int64 book = 2;
+}

+ 58 - 0
services/test/schemas/http_messaging.proto

@@ -0,0 +1,58 @@
+syntax = "proto3";
+
+package messaging;
+
+import "google/api/annotations.proto";
+import "google/protobuf/empty.proto";
+
+service Messaging {
+  rpc GetMessage(GetMessageRequest) returns (Message) {
+    option (google.api.http) = {
+      get: "/v1/{name=messages/*}"
+    };
+  }
+  rpc SearchMessage(SearchMessageRequest) returns (Message) {
+    option (google.api.http) = {
+      get:"/v1/messages/filter/{message_id}"
+    };
+  }
+
+  rpc UpdateMessage(UpdateMessageRequest) returns (Message) {
+    option (google.api.http) = {
+      put: "/v1/messages/{message_id}"
+      body: "message"
+    };
+  }
+
+  rpc PatchMessage(PatchMessageRequest) returns (Message) {
+    option (google.api.http) = {
+      patch: "/v1/messages/{message_id}"
+      body: "*"
+    };
+  }
+}
+message GetMessageRequest {
+  string name = 1; // Mapped to URL path.
+}
+message Message {
+  string text = 1; // The resource content.
+}
+
+message SearchMessageRequest {
+  message SubMessage {
+    string subfield = 1;
+  }
+  string message_id = 1; // Mapped to URL path.
+  int64 revision = 2;    // Mapped to URL query parameter `revision`.
+  SubMessage sub = 3;    // Mapped to URL query parameter `sub.subfield`.
+}
+
+message UpdateMessageRequest {
+  string message_id = 1; // mapped to the URL
+  Message message = 2;   // mapped to the body
+}
+
+message PatchMessageRequest {
+  string message_id = 1;
+  string text = 2;
+}

+ 1 - 6
services/test/schemas/hw.proto

@@ -7,7 +7,6 @@ option java_outer_classname = "HelloWorldProto";
 
 package helloworld;
 
-import "google/api/annotations.proto";
 import "google/protobuf/empty.proto";
 import "google/protobuf/wrappers.proto";
 
@@ -18,11 +17,7 @@ service Greeter {
   rpc Compute (InferRequest) returns (Response) {}
   // Primitive type param. Not supported for rest json
   rpc get_feature(google.protobuf.BytesValue) returns(FeatureResponse) {}
-  rpc object_detection(ObjectDetectionRequest) returns(ObjectDetectionResponse) {
-    option (google.api.http) = {
-      post: "/v1/object_detection/"
-    };
-  }
+  rpc object_detection(ObjectDetectionRequest) returns(ObjectDetectionResponse) {}
   rpc getStatus(google.protobuf.Empty) returns(google.protobuf.BoolValue) {}
   rpc RestEncodedJson(google.protobuf.StringValue) returns(google.protobuf.StringValue) {}
 }

+ 75 - 0
xstream/topotest/mock_topo.go

@@ -1003,6 +1003,73 @@ var testData = map[string][]*xsql.Tuple{
 			Timestamp: 1541152488003,
 		},
 	},
+	"shelves": {
+		{
+			Emitter: "shelves",
+			Message: map[string]interface{}{
+				"name": "name1",
+				"size": 2,
+				"shelf": map[string]interface{}{
+					"id":       1541152486013,
+					"theme":    "tandra",
+					"subfield": "sub1",
+				},
+			},
+			Timestamp: 1541152486501,
+		},
+		{
+			Emitter: "shelves",
+			Message: map[string]interface{}{
+				"name": "name2",
+				"size": 3,
+				"shelf": map[string]interface{}{
+					"id":       1541152486822,
+					"theme":    "claro",
+					"subfield": "sub2",
+				},
+			},
+			Timestamp: 1541152486502,
+		},
+		{
+			Emitter: "shelves",
+			Message: map[string]interface{}{
+				"name": "name3",
+				"size": 4,
+				"shelf": map[string]interface{}{
+					"id":       1541152487632,
+					"theme":    "dark",
+					"subfield": "sub3",
+				},
+			},
+			Timestamp: 1541152488001,
+		},
+	},
+	"mes": {
+		{
+			Emitter: "mes",
+			Message: map[string]interface{}{
+				"message_id": "1541152486013",
+				"text":       "message1",
+			},
+			Timestamp: 1541152486501,
+		},
+		{
+			Emitter: "mes",
+			Message: map[string]interface{}{
+				"message_id": "1541152486501",
+				"text":       "message2",
+			},
+			Timestamp: 1541152486501,
+		},
+		{
+			Emitter: "mes",
+			Message: map[string]interface{}{
+				"message_id": "1541152487013",
+				"text":       "message3",
+			},
+			Timestamp: 1541152487501,
+		},
+	},
 }
 
 func commonResultFunc(result [][]byte) interface{} {
@@ -1268,6 +1335,14 @@ func HandleStream(createOrDrop bool, names []string, t *testing.T) {
 				sql = `CREATE STREAM commands (cmd string, base64_img string, encoded_json string) WITH (DATASOURCE="commands", FORMAT="JSON")`
 			case "fakeBin":
 				sql = "CREATE STREAM fakeBin () WITH (DATASOURCE=\"users\", FORMAT=\"BINARY\")"
+			case "shelves":
+				sql = `CREATE STREAM shelves (
+					name string,
+					size BIGINT,
+					shelf STRUCT(theme STRING,id BIGINT, subfield STRING)
+				) WITH (DATASOURCE="shelves", FORMAT="json");`
+			case "mes":
+				sql = `CREATE STREAM mes (message_id string, text string) WITH (DATASOURCE="mes", FORMAT="JSON")`
 			default:
 				t.Errorf("create stream %s fail", name)
 			}