|
- // Copyright 2021-2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package service
- import (
- "context"
- "encoding/json"
- "fmt"
- "io"
- "net"
- "net/http"
- "net/http/httptest"
- "reflect"
- "strconv"
- "testing"
- "github.com/golang/protobuf/ptypes/empty"
- "github.com/golang/protobuf/ptypes/wrappers"
- "github.com/gorilla/mux"
- "google.golang.org/grpc"
- kconf "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/lf-edge/ekuiper/internal/topo/topotest"
- "github.com/lf-edge/ekuiper/pkg/api"
- )
- type RestHelloRequest struct {
- Name string `json:"name,omitempty"`
- }
- type RestHelloReply struct {
- Message string `json:"message,omitempty"`
- }
- type ObjectDetectRequest struct {
- Command string `json:"cmd,omitempty"`
- Image string `json:"base64_img,omitempty"`
- }
- type ObjectDetectResponse struct {
- Info string `json:"cmd,omitempty"`
- Code int `json:"base64_img,omitempty"`
- Image string `json:"image,omitempty"`
- Result string `json:"result,omitempty"`
- Type string `json:"type,omitempty"`
- }
- //type Box struct {
- // X int32 `json:"x,omitempty"`
- // Y int32 `json:"y,omitempty"`
- // W int32 `json:"w,omitempty"`
- // H int32 `json:"h,omitempty"`
- //}
- //type FeatureResult struct {
- // Features []float64 `json:"features,omitempty"`
- // Box Box `json:"box,omitempty"`
- //}
- type EncodedRequest struct {
- Name string `json:"name,omitempty"`
- Size int `json:"size,omitempty"`
- }
- type ShelfMessage struct {
- Id string `json:"id,omitempty"`
- Theme string `json:"theme,omitempty"`
- }
- type ShelfMessageOut struct {
- Id int64 `json:"id,omitempty"`
- Theme string `json:"theme,omitempty"`
- }
- type BookMessage struct {
- Id int64 `json:"id,omitempty"`
- Author string `json:"author,omitempty"`
- Title string `json:"title,omitempty"`
- }
- type MessageMessage struct {
- Text string `json:"text,omitempty"`
- }
- func TestRestService(t *testing.T) {
- // mock server, the port is set in the sample.json
- l, err := net.Listen("tcp", "127.0.0.1:51234")
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- count := 0
- router := mux.NewRouter()
- router.HandleFunc("/SayHello", func(w http.ResponseWriter, r *http.Request) {
- body := &RestHelloRequest{}
- err := json.NewDecoder(r.Body).Decode(body)
- if err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
- }
- out := &RestHelloReply{Message: body.Name}
- jsonOut(w, out)
- }).Methods(http.MethodPost)
- router.HandleFunc("/object_detection", func(w http.ResponseWriter, r *http.Request) {
- req := &ObjectDetectRequest{}
- err := json.NewDecoder(r.Body).Decode(req)
- if err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
- }
- if req.Image == "" {
- http.Error(w, "image is not found", http.StatusBadRequest)
- }
- out := &ObjectDetectResponse{
- Info: req.Command,
- Code: 200,
- Image: req.Image,
- Result: req.Command + " success",
- Type: "S",
- }
- jsonOut(w, out)
- }).Methods(http.MethodPost)
- router.HandleFunc("/getStatus", func(w http.ResponseWriter, r *http.Request) {
- result := count%2 == 0
- count++
- io.WriteString(w, fmt.Sprintf("%v", result))
- }).Methods(http.MethodPost)
- router.HandleFunc("/RestEncodedJson", func(w http.ResponseWriter, r *http.Request) {
- req := &EncodedRequest{}
- err := json.NewDecoder(r.Body).Decode(req)
- if err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
- }
- io.WriteString(w, req.Name)
- }).Methods(http.MethodPost)
- router.HandleFunc("/bookshelf/v1/shelves", func(w http.ResponseWriter, r *http.Request) {
- req := &ShelfMessage{}
- err := json.NewDecoder(r.Body).Decode(req)
- if err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
- }
- if req.Id == "" || req.Theme == "" {
- http.Error(w, "empty request", http.StatusBadRequest)
- }
- idint, _ := strconv.Atoi(req.Id)
- out := ShelfMessageOut{Id: int64(idint), Theme: req.Theme}
- jsonOut(w, out)
- }).Methods(http.MethodPost)
- router.HandleFunc("/bookshelf/v1/shelves/{shelf}/books/{book}", func(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
- vars := mux.Vars(r)
- shelf, book := vars["shelf"], vars["book"]
- if shelf == "" || book == "" {
- http.Error(w, "empty request", http.StatusBadRequest)
- }
- idint, _ := strconv.Atoi(book)
- out := BookMessage{Id: int64(idint), Author: "NA", Title: "title_" + book}
- jsonOut(w, out)
- }).Methods(http.MethodGet)
- router.HandleFunc("/messaging/v1/messages/{name}", func(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
- vars := mux.Vars(r)
- name := vars["name"]
- if name == "" {
- http.Error(w, "empty request", http.StatusBadRequest)
- }
- out := MessageMessage{Text: name + " content"}
- jsonOut(w, out)
- }).Methods(http.MethodGet)
- router.HandleFunc("/messaging/v1/messages/filter/{name}", func(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
- vars := mux.Vars(r)
- name := vars["name"]
- q := r.URL.Query()
- rev, sub := q.Get("revision"), q.Get("sub.subfield")
- if name == "" || rev == "" || sub == "" {
- http.Error(w, "empty request", http.StatusBadRequest)
- }
- out := MessageMessage{Text: name + rev + sub}
- jsonOut(w, out)
- }).Methods(http.MethodGet)
- router.HandleFunc("/messaging/v1/messages/{name}", func(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
- vars := mux.Vars(r)
- name := vars["name"]
- if name == "" {
- http.Error(w, "empty request", http.StatusBadRequest)
- }
- body := &MessageMessage{}
- err := json.NewDecoder(r.Body).Decode(body)
- if err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
- }
- out := MessageMessage{Text: body.Text}
- jsonOut(w, out)
- }).Methods(http.MethodPut, http.MethodPatch)
- server := httptest.NewUnstartedServer(router)
- server.Listener.Close()
- server.Listener = l
- // Start the server.
- server.Start()
- defer server.Close()
- // Reset
- streamList := []string{"helloStr", "commands", "fakeBin", "shelves", "demo", "mes"}
- topotest.HandleStream(false, streamList, t)
- // Data setup
- tests := []topotest.RuleTest{
- {
- Name: `TestRestRule1`,
- Sql: `SELECT helloFromRest(name) as wc FROM helloStr`,
- R: [][]map[string]interface{}{
- {{
- "wc": map[string]interface{}{
- "message": "world",
- },
- }},
- {{
- "wc": map[string]interface{}{
- "message": "golang",
- },
- }},
- {{
- "wc": map[string]interface{}{
- "message": "peacock",
- },
- }},
- },
- M: map[string]interface{}{
- "op_2_project_0_exceptions_total": int64(0),
- "op_2_project_0_process_latency_us": int64(0),
- "op_2_project_0_records_in_total": int64(3),
- "op_2_project_0_records_out_total": int64(3),
- "sink_mockSink_0_exceptions_total": int64(0),
- "sink_mockSink_0_records_in_total": int64(3),
- "sink_mockSink_0_records_out_total": int64(3),
- },
- }, {
- Name: `TestRestRule2`,
- Sql: `SELECT objectDetectFromRest(cmd, base64_img)->result FROM commands`,
- R: [][]map[string]interface{}{
- {{
- "kuiper_field_0": "get success",
- }},
- {{
- "kuiper_field_0": "detect success",
- }},
- {{
- "kuiper_field_0": "delete success",
- }},
- },
- M: map[string]interface{}{
- "op_2_project_0_exceptions_total": int64(0),
- "op_2_project_0_process_latency_us": int64(0),
- "op_2_project_0_records_in_total": int64(3),
- "op_2_project_0_records_out_total": int64(3),
- "sink_mockSink_0_exceptions_total": int64(0),
- "sink_mockSink_0_records_in_total": int64(3),
- "sink_mockSink_0_records_out_total": int64(3),
- },
- }, {
- Name: `TestRestRule3`,
- Sql: `SELECT objectDetectFromRest(*)->result FROM commands`,
- R: [][]map[string]interface{}{
- {{
- "kuiper_field_0": "get success",
- }},
- {{
- "kuiper_field_0": "detect success",
- }},
- {{
- "kuiper_field_0": "delete success",
- }},
- },
- M: map[string]interface{}{
- "op_2_project_0_exceptions_total": int64(0),
- "op_2_project_0_process_latency_us": int64(0),
- "op_2_project_0_records_in_total": int64(3),
- "op_2_project_0_records_out_total": int64(3),
- "sink_mockSink_0_exceptions_total": int64(0),
- "sink_mockSink_0_records_in_total": int64(3),
- "sink_mockSink_0_records_out_total": int64(3),
- },
- //}, {
- // Name: `TestRestRule3`,
- // Sql: `SELECT getFeatureFromRest(self)->feature[0]->box->h FROM fakeBin`,
- // R: [][]map[string]interface{}{
- // {{
- // "kuiper_field_0": 106,
- // }},
- // {{
- // "kuiper_field_0": 107,
- // }},
- // {{
- // "kuiper_field_0": 108,
- // }},
- // },
- // M: map[string]interface{}{
- // "op_2_project_0_exceptions_total": int64(0),
- // "op_2_project_0_process_latency_us": int64(0),
- // "op_2_project_0_records_in_total": int64(3),
- // "op_2_project_0_records_out_total": int64(3),
- //
- // "sink_mockSink_0_exceptions_total": int64(0),
- // "sink_mockSink_0_records_in_total": int64(3),
- // "sink_mockSink_0_records_out_total": int64(3),
- // },
- }, {
- Name: `TestRestRule4`,
- Sql: `SELECT getStatusFromRest(), cmd FROM commands`,
- R: [][]map[string]interface{}{
- {{
- "getStatusFromRest": true,
- "cmd": "get",
- }},
- {{
- "getStatusFromRest": false,
- "cmd": "detect",
- }},
- {{
- "getStatusFromRest": true,
- "cmd": "delete",
- }},
- },
- M: map[string]interface{}{
- "op_2_project_0_exceptions_total": int64(0),
- "op_2_project_0_process_latency_us": int64(0),
- "op_2_project_0_records_in_total": int64(3),
- "op_2_project_0_records_out_total": int64(3),
- "sink_mockSink_0_exceptions_total": int64(0),
- "sink_mockSink_0_records_in_total": int64(3),
- "sink_mockSink_0_records_out_total": int64(3),
- },
- }, {
- Name: `TestRestRule5`,
- Sql: `SELECT restEncodedJson(encoded_json) as name FROM commands`,
- R: [][]map[string]interface{}{
- {{
- "name": "name1",
- }},
- {{
- "name": "name2",
- }},
- {{
- "name": "name3",
- }},
- },
- M: map[string]interface{}{
- "op_2_project_0_exceptions_total": int64(0),
- "op_2_project_0_process_latency_us": int64(0),
- "op_2_project_0_records_in_total": int64(3),
- "op_2_project_0_records_out_total": int64(3),
- "sink_mockSink_0_exceptions_total": int64(0),
- "sink_mockSink_0_records_in_total": int64(3),
- "sink_mockSink_0_records_out_total": int64(3),
- },
- }, {
- Name: `TestRestRule6`,
- Sql: `SELECT CreateShelf(shelf)->theme as theme FROM shelves`,
- R: [][]map[string]interface{}{
- {{
- "theme": "tandra",
- }},
- {{
- "theme": "claro",
- }},
- {{
- "theme": "dark",
- }},
- },
- M: map[string]interface{}{
- "op_2_project_0_exceptions_total": int64(0),
- "op_2_project_0_process_latency_us": int64(0),
- "op_2_project_0_records_in_total": int64(3),
- "op_2_project_0_records_out_total": int64(3),
- "sink_mockSink_0_exceptions_total": int64(0),
- "sink_mockSink_0_records_in_total": int64(3),
- "sink_mockSink_0_records_out_total": int64(3),
- },
- }, {
- Name: `TestRestRule7`,
- Sql: `SELECT GetBook(size, ts)->title as title FROM demo WHERE size > 3 `,
- R: [][]map[string]interface{}{
- {{
- "title": "title_1541152486822",
- }},
- {{
- "title": "title_1541152488442",
- }},
- },
- M: map[string]interface{}{
- "op_2_filter_0_exceptions_total": int64(0),
- "op_2_filter_0_process_latency_us": int64(0),
- "op_2_filter_0_records_in_total": int64(5),
- "op_2_filter_0_records_out_total": int64(2),
- "sink_mockSink_0_exceptions_total": int64(0),
- "sink_mockSink_0_records_in_total": int64(2),
- "sink_mockSink_0_records_out_total": int64(2),
- },
- }, {
- Name: `TestRestRule8`,
- Sql: `SELECT GetMessage(concat("messages/",ts))->text as message FROM demo WHERE size > 3`,
- R: [][]map[string]interface{}{
- {{
- "message": "1541152486822 content",
- }},
- {{
- "message": "1541152488442 content",
- }},
- },
- M: map[string]interface{}{
- "op_2_filter_0_exceptions_total": int64(0),
- "op_2_filter_0_process_latency_us": int64(0),
- "op_2_filter_0_records_in_total": int64(5),
- "op_2_filter_0_records_out_total": int64(2),
- "sink_mockSink_0_exceptions_total": int64(0),
- "sink_mockSink_0_records_in_total": int64(2),
- "sink_mockSink_0_records_out_total": int64(2),
- },
- }, {
- Name: `TestRestRule9`,
- Sql: `SELECT SearchMessage(name, size, shelf)->text as message FROM shelves`,
- R: [][]map[string]interface{}{
- {{
- "message": "name12sub1",
- }},
- {{
- "message": "name23sub2",
- }},
- {{
- "message": "name34sub3",
- }},
- },
- M: map[string]interface{}{
- "op_2_project_0_exceptions_total": int64(0),
- "op_2_project_0_process_latency_us": int64(0),
- "op_2_project_0_records_in_total": int64(3),
- "op_2_project_0_records_out_total": int64(3),
- "sink_mockSink_0_exceptions_total": int64(0),
- "sink_mockSink_0_records_in_total": int64(3),
- "sink_mockSink_0_records_out_total": int64(3),
- },
- // TODO support * as one of the parameters
- //},{
- // Name: `TestRestRule10`,
- // Sql: `SELECT UpdateMessage(message_id, *)->text as message FROM mes`,
- // R: [][]map[string]interface{}{
- // {{
- // "message": "message1",
- // }},
- // {{
- // "message": "message2",
- // }},
- // {{
- // "message": "message3",
- // }},
- // },
- // M: map[string]interface{}{
- // "op_2_project_0_exceptions_total": int64(0),
- // "op_2_project_0_process_latency_us": int64(0),
- // "op_2_project_0_records_in_total": int64(3),
- // "op_2_project_0_records_out_total": int64(3),
- //
- // "sink_mockSink_0_exceptions_total": int64(0),
- // "sink_mockSink_0_records_in_total": int64(3),
- // "sink_mockSink_0_records_out_total": int64(3),
- // },
- }, {
- Name: `TestRestRule11`,
- Sql: `SELECT PatchMessage(message_id, text)->text as message FROM mes`,
- R: [][]map[string]interface{}{
- {{
- "message": "message1",
- }},
- {{
- "message": "message2",
- }},
- {{
- "message": "message3",
- }},
- },
- M: map[string]interface{}{
- "op_2_project_0_exceptions_total": int64(0),
- "op_2_project_0_process_latency_us": int64(0),
- "op_2_project_0_records_in_total": int64(3),
- "op_2_project_0_records_out_total": int64(3),
- "sink_mockSink_0_exceptions_total": int64(0),
- "sink_mockSink_0_records_in_total": int64(3),
- "sink_mockSink_0_records_out_total": int64(3),
- },
- },
- }
- topotest.HandleStream(true, streamList, t)
- topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
- BufferLength: 100,
- SendError: true,
- }, 0)
- }
- func jsonOut(w http.ResponseWriter, out interface{}) {
- w.Header().Add("Content-Type", "application/json")
- enc := json.NewEncoder(w)
- err := enc.Encode(out)
- // Problems encoding
- if err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
- }
- }
- type Resolver map[string]reflect.Value
- func (r Resolver) Resolve(name string, _ []reflect.Value) (reflect.Value, error) {
- return r[name], nil
- }
- func SayHello(name string) map[string]interface{} {
- return map[string]interface{}{
- "message": name,
- }
- }
- func get_feature(img []byte) map[string]interface{} {
- l := len(string(img))
- return map[string]interface{}{
- "feature": []map[string]interface{}{
- {
- "features": []float64{-1.444, 2.55452, 5.121},
- "box": map[string]interface{}{
- "x": 153,
- "y": 107,
- "w": 174,
- "h": 100 + l,
- },
- }, {
- "features": []float64{1.444, -2.55452, -5.121},
- "box": map[string]interface{}{
- "x": 257,
- "y": 92,
- "w": 169,
- "h": 208,
- },
- },
- },
- }
- }
- func object_detection(command string, image string) map[string]interface{} {
- out := map[string]interface{}{
- "info": command,
- "code": 200,
- "image": image,
- "result": command + " success",
- "type": "S",
- }
- return out
- }
- func getStatus() bool {
- return true
- }
- type server struct {
- UnimplementedGreeterServer
- }
- func (s *server) SayHello(_ context.Context, in *HelloRequest) (*HelloReply, error) {
- return &HelloReply{Message: in.GetName()}, nil
- }
- func (s *server) ObjectDetection(_ context.Context, in *ObjectDetectionRequest) (*ObjectDetectionResponse, error) {
- return &ObjectDetectionResponse{
- Info: in.Cmd,
- Code: 200,
- Image: in.Base64Img,
- Result: in.Cmd + " success",
- Type: "S",
- }, nil
- }
- func (s *server) GetFeature(_ context.Context, v *wrappers.BytesValue) (*FeatureResponse, error) {
- l := len(string(v.Value))
- return &FeatureResponse{
- Feature: []*FeatureResult{
- {
- Features: []float32{-1.444, 2.55452, 5.121},
- Box: &Box{
- X: 153,
- Y: 107,
- W: 174,
- H: int32(100 + l),
- },
- },
- {
- Features: []float32{1.444, -2.55452, -5.121},
- Box: &Box{
- X: 257,
- Y: 92,
- W: 169,
- H: 208,
- },
- },
- },
- }, nil
- }
- func (s *server) GetStatus(context.Context, *empty.Empty) (*wrappers.BoolValue, error) {
- return &wrappers.BoolValue{Value: true}, nil
- }
- func TestGrpcService(t *testing.T) {
- lis, err := net.Listen("tcp", ":50051")
- if err != nil {
- kconf.Log.Fatalf("failed to listen: %v", err)
- }
- s := grpc.NewServer()
- RegisterGreeterServer(s, &server{})
- go func() {
- if err := s.Serve(lis); err != nil {
- kconf.Log.Fatalf("failed to serve: %v", err)
- }
- }()
- defer s.Stop()
- // Reset
- streamList := []string{"helloStr", "commands", "fakeBin"}
- topotest.HandleStream(false, streamList, t)
- // Data setup
- tests := []topotest.RuleTest{
- {
- Name: `TestRestRule1`,
- Sql: `SELECT helloFromGrpc(name) as wc FROM helloStr`,
- R: [][]map[string]interface{}{
- {{
- "wc": map[string]interface{}{
- "message": "world",
- },
- }},
- {{
- "wc": map[string]interface{}{
- "message": "golang",
- },
- }},
- {{
- "wc": map[string]interface{}{
- "message": "peacock",
- },
- }},
- },
- M: map[string]interface{}{
- "op_2_project_0_exceptions_total": int64(0),
- "op_2_project_0_process_latency_us": int64(0),
- "op_2_project_0_records_in_total": int64(3),
- "op_2_project_0_records_out_total": int64(3),
- "sink_mockSink_0_exceptions_total": int64(0),
- "sink_mockSink_0_records_in_total": int64(3),
- "sink_mockSink_0_records_out_total": int64(3),
- },
- }, {
- Name: `TestRestRule2`,
- Sql: `SELECT objectDetectFromGrpc(cmd, base64_img)->result FROM commands`,
- R: [][]map[string]interface{}{
- {{
- "kuiper_field_0": "get success",
- }},
- {{
- "kuiper_field_0": "detect success",
- }},
- {{
- "kuiper_field_0": "delete success",
- }},
- },
- M: map[string]interface{}{
- "op_2_project_0_exceptions_total": int64(0),
- "op_2_project_0_process_latency_us": int64(0),
- "op_2_project_0_records_in_total": int64(3),
- "op_2_project_0_records_out_total": int64(3),
- "sink_mockSink_0_exceptions_total": int64(0),
- "sink_mockSink_0_records_in_total": int64(3),
- "sink_mockSink_0_records_out_total": int64(3),
- },
- }, {
- Name: `TestRestRule3`,
- Sql: `SELECT getFeatureFromGrpc(self)->feature[0]->box->h FROM fakeBin`,
- R: [][]map[string]interface{}{
- {{
- "kuiper_field_0": float64(106), // Convert by the testing tool
- }},
- {{
- "kuiper_field_0": float64(107),
- }},
- {{
- "kuiper_field_0": float64(108),
- }},
- },
- M: map[string]interface{}{
- "op_2_project_0_exceptions_total": int64(0),
- "op_2_project_0_process_latency_us": int64(0),
- "op_2_project_0_records_in_total": int64(3),
- "op_2_project_0_records_out_total": int64(3),
- "sink_mockSink_0_exceptions_total": int64(0),
- "sink_mockSink_0_records_in_total": int64(3),
- "sink_mockSink_0_records_out_total": int64(3),
- },
- }, {
- Name: `TestRestRule4`,
- Sql: `SELECT getStatusFromGrpc(), cmd FROM commands`,
- R: [][]map[string]interface{}{
- {{
- "getStatusFromGrpc": true,
- "cmd": "get",
- }},
- {{
- "getStatusFromGrpc": true,
- "cmd": "detect",
- }},
- {{
- "getStatusFromGrpc": true,
- "cmd": "delete",
- }},
- },
- M: map[string]interface{}{
- "op_2_project_0_exceptions_total": int64(0),
- "op_2_project_0_process_latency_us": int64(0),
- "op_2_project_0_records_in_total": int64(3),
- "op_2_project_0_records_out_total": int64(3),
- "sink_mockSink_0_exceptions_total": int64(0),
- "sink_mockSink_0_records_in_total": int64(3),
- "sink_mockSink_0_records_out_total": int64(3),
- },
- },
- }
- topotest.HandleStream(true, streamList, t)
- topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
- BufferLength: 100,
- SendError: true,
- }, 0)
- }
|