external_service_rule_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658
  1. package services
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. pb "github.com/emqx/kuiper/services/test/schemas/helloworld"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "github.com/emqx/kuiper/xstream/topotest"
  10. "github.com/golang/protobuf/ptypes/empty"
  11. "github.com/golang/protobuf/ptypes/wrappers"
  12. "github.com/msgpack-rpc/msgpack-rpc-go/rpc"
  13. "google.golang.org/grpc"
  14. "io"
  15. "net"
  16. "net/http"
  17. "net/http/httptest"
  18. "reflect"
  19. "testing"
  20. )
  21. type HelloRequest struct {
  22. Name string `json:"name,omitempty"`
  23. }
  24. type HelloReply struct {
  25. Message string `json:"message,omitempty"`
  26. }
  27. type ObjectDetectRequest struct {
  28. Command string `json:"cmd,omitempty"`
  29. Image string `json:"base64_img,omitempty"`
  30. }
  31. type ObjectDetectResponse struct {
  32. Info string `json:"cmd,omitempty"`
  33. Code int `json:"base64_img,omitempty"`
  34. Image string `json:"image,omitempty"`
  35. Result string `json:"result,omitempty"`
  36. Type string `json:"type,omitempty"`
  37. }
  38. //type Box struct {
  39. // X int32 `json:"x,omitempty"`
  40. // Y int32 `json:"y,omitempty"`
  41. // W int32 `json:"w,omitempty"`
  42. // H int32 `json:"h,omitempty"`
  43. //}
  44. //type FeatureResult struct {
  45. // Features []float64 `json:"features,omitempty"`
  46. // Box Box `json:"box,omitempty"`
  47. //}
  48. type EncodedRequest struct {
  49. Name string `json:"name,omitempty"`
  50. Size int `json:"size,omitempty"`
  51. }
  52. func TestRestService(t *testing.T) {
  53. // mock server, the port is set in the sample.json
  54. l, err := net.Listen("tcp", "127.0.0.1:51234")
  55. if err != nil {
  56. t.Error(err)
  57. t.FailNow()
  58. }
  59. count := 0
  60. server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  61. path := r.URL.Path
  62. defer r.Body.Close()
  63. var (
  64. out interface{}
  65. )
  66. switch path {
  67. case "/SayHello":
  68. body := &HelloRequest{}
  69. err := json.NewDecoder(r.Body).Decode(body)
  70. if err != nil {
  71. http.Error(w, err.Error(), http.StatusBadRequest)
  72. }
  73. out = &HelloReply{Message: body.Name}
  74. case "/object_detection":
  75. req := &ObjectDetectRequest{}
  76. err := json.NewDecoder(r.Body).Decode(req)
  77. if err != nil {
  78. http.Error(w, err.Error(), http.StatusBadRequest)
  79. }
  80. if req.Image == "" {
  81. http.Error(w, "image is not found", http.StatusBadRequest)
  82. }
  83. out = &ObjectDetectResponse{
  84. Info: req.Command,
  85. Code: 200,
  86. Image: req.Image,
  87. Result: req.Command + " success",
  88. Type: "S",
  89. }
  90. case "/getStatus":
  91. r := count%2 == 0
  92. count++
  93. io.WriteString(w, fmt.Sprintf("%v", r))
  94. return
  95. case "/RestEncodedJson":
  96. req := &EncodedRequest{}
  97. err := json.NewDecoder(r.Body).Decode(req)
  98. if err != nil {
  99. http.Error(w, err.Error(), http.StatusBadRequest)
  100. }
  101. io.WriteString(w, req.Name)
  102. return
  103. default:
  104. http.Error(w, "path not supported", http.StatusBadRequest)
  105. }
  106. w.Header().Add("Content-Type", "application/json")
  107. enc := json.NewEncoder(w)
  108. err = enc.Encode(out)
  109. // Problems encoding
  110. if err != nil {
  111. http.Error(w, err.Error(), http.StatusBadRequest)
  112. }
  113. }))
  114. server.Listener.Close()
  115. server.Listener = l
  116. // Start the server.
  117. server.Start()
  118. defer server.Close()
  119. //Reset
  120. streamList := []string{"helloStr", "commands", "fakeBin"}
  121. topotest.HandleStream(false, streamList, t)
  122. //Data setup
  123. var tests = []topotest.RuleTest{
  124. {
  125. Name: `TestRestRule1`,
  126. Sql: `SELECT helloFromRest(name) as wc FROM helloStr`,
  127. R: [][]map[string]interface{}{
  128. {{
  129. "wc": map[string]interface{}{
  130. "message": "world",
  131. },
  132. }},
  133. {{
  134. "wc": map[string]interface{}{
  135. "message": "golang",
  136. },
  137. }},
  138. {{
  139. "wc": map[string]interface{}{
  140. "message": "peacock",
  141. },
  142. }},
  143. },
  144. M: map[string]interface{}{
  145. "op_2_project_0_exceptions_total": int64(0),
  146. "op_2_project_0_process_latency_us": int64(0),
  147. "op_2_project_0_records_in_total": int64(3),
  148. "op_2_project_0_records_out_total": int64(3),
  149. "sink_mockSink_0_exceptions_total": int64(0),
  150. "sink_mockSink_0_records_in_total": int64(3),
  151. "sink_mockSink_0_records_out_total": int64(3),
  152. },
  153. }, {
  154. Name: `TestRestRule2`,
  155. Sql: `SELECT objectDetectFromRest(cmd, base64_img)->result FROM commands`,
  156. R: [][]map[string]interface{}{
  157. {{
  158. "kuiper_field_0": "get success",
  159. }},
  160. {{
  161. "kuiper_field_0": "detect success",
  162. }},
  163. {{
  164. "kuiper_field_0": "delete success",
  165. }},
  166. },
  167. M: map[string]interface{}{
  168. "op_2_project_0_exceptions_total": int64(0),
  169. "op_2_project_0_process_latency_us": int64(0),
  170. "op_2_project_0_records_in_total": int64(3),
  171. "op_2_project_0_records_out_total": int64(3),
  172. "sink_mockSink_0_exceptions_total": int64(0),
  173. "sink_mockSink_0_records_in_total": int64(3),
  174. "sink_mockSink_0_records_out_total": int64(3),
  175. },
  176. }, {
  177. Name: `TestRestRule3`,
  178. Sql: `SELECT objectDetectFromRest(*)->result FROM commands`,
  179. R: [][]map[string]interface{}{
  180. {{
  181. "kuiper_field_0": "get success",
  182. }},
  183. {{
  184. "kuiper_field_0": "detect success",
  185. }},
  186. {{
  187. "kuiper_field_0": "delete success",
  188. }},
  189. },
  190. M: map[string]interface{}{
  191. "op_2_project_0_exceptions_total": int64(0),
  192. "op_2_project_0_process_latency_us": int64(0),
  193. "op_2_project_0_records_in_total": int64(3),
  194. "op_2_project_0_records_out_total": int64(3),
  195. "sink_mockSink_0_exceptions_total": int64(0),
  196. "sink_mockSink_0_records_in_total": int64(3),
  197. "sink_mockSink_0_records_out_total": int64(3),
  198. },
  199. //}, {
  200. // Name: `TestRestRule3`,
  201. // Sql: `SELECT getFeatureFromRest(self)->feature[0]->box->h FROM fakeBin`,
  202. // R: [][]map[string]interface{}{
  203. // {{
  204. // "kuiper_field_0": 106,
  205. // }},
  206. // {{
  207. // "kuiper_field_0": 107,
  208. // }},
  209. // {{
  210. // "kuiper_field_0": 108,
  211. // }},
  212. // },
  213. // M: map[string]interface{}{
  214. // "op_2_project_0_exceptions_total": int64(0),
  215. // "op_2_project_0_process_latency_us": int64(0),
  216. // "op_2_project_0_records_in_total": int64(3),
  217. // "op_2_project_0_records_out_total": int64(3),
  218. //
  219. // "sink_mockSink_0_exceptions_total": int64(0),
  220. // "sink_mockSink_0_records_in_total": int64(3),
  221. // "sink_mockSink_0_records_out_total": int64(3),
  222. // },
  223. }, {
  224. Name: `TestRestRule4`,
  225. Sql: `SELECT getStatusFromRest(), cmd FROM commands`,
  226. R: [][]map[string]interface{}{
  227. {{
  228. "getStatusFromRest": true,
  229. "cmd": "get",
  230. }},
  231. {{
  232. "getStatusFromRest": false,
  233. "cmd": "detect",
  234. }},
  235. {{
  236. "getStatusFromRest": true,
  237. "cmd": "delete",
  238. }},
  239. },
  240. M: map[string]interface{}{
  241. "op_2_project_0_exceptions_total": int64(0),
  242. "op_2_project_0_process_latency_us": int64(0),
  243. "op_2_project_0_records_in_total": int64(3),
  244. "op_2_project_0_records_out_total": int64(3),
  245. "sink_mockSink_0_exceptions_total": int64(0),
  246. "sink_mockSink_0_records_in_total": int64(3),
  247. "sink_mockSink_0_records_out_total": int64(3),
  248. },
  249. }, {
  250. Name: `TestRestRule5`,
  251. Sql: `SELECT restEncodedJson(encoded_json) as name FROM commands`,
  252. R: [][]map[string]interface{}{
  253. {{
  254. "name": "name1",
  255. }},
  256. {{
  257. "name": "name2",
  258. }},
  259. {{
  260. "name": "name3",
  261. }},
  262. },
  263. M: map[string]interface{}{
  264. "op_2_project_0_exceptions_total": int64(0),
  265. "op_2_project_0_process_latency_us": int64(0),
  266. "op_2_project_0_records_in_total": int64(3),
  267. "op_2_project_0_records_out_total": int64(3),
  268. "sink_mockSink_0_exceptions_total": int64(0),
  269. "sink_mockSink_0_records_in_total": int64(3),
  270. "sink_mockSink_0_records_out_total": int64(3),
  271. },
  272. },
  273. }
  274. topotest.HandleStream(true, streamList, t)
  275. topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
  276. BufferLength: 100,
  277. SendError: true,
  278. }, 0)
  279. }
  280. type Resolver map[string]reflect.Value
  281. func (self Resolver) Resolve(name string, _ []reflect.Value) (reflect.Value, error) {
  282. return self[name], nil
  283. }
  284. func SayHello(name string) map[string]interface{} {
  285. return map[string]interface{}{
  286. "message": name,
  287. }
  288. }
  289. func get_feature(img []byte) map[string]interface{} {
  290. l := len(string(img))
  291. return map[string]interface{}{
  292. "feature": []map[string]interface{}{
  293. {
  294. "features": []float64{-1.444, 2.55452, 5.121},
  295. "box": map[string]interface{}{
  296. "x": 153,
  297. "y": 107,
  298. "w": 174,
  299. "h": 100 + l,
  300. },
  301. }, {
  302. "features": []float64{1.444, -2.55452, -5.121},
  303. "box": map[string]interface{}{
  304. "x": 257,
  305. "y": 92,
  306. "w": 169,
  307. "h": 208,
  308. },
  309. },
  310. },
  311. }
  312. }
  313. func object_detection(command string, image string) map[string]interface{} {
  314. out := map[string]interface{}{
  315. "info": command,
  316. "code": 200,
  317. "image": image,
  318. "result": command + " success",
  319. "type": "S",
  320. }
  321. return out
  322. }
  323. func getStatus() bool {
  324. return true
  325. }
  326. func TestMsgpackService(t *testing.T) {
  327. // mock server
  328. res := Resolver{"SayHello": reflect.ValueOf(SayHello), "object_detection": reflect.ValueOf(object_detection), "get_feature": reflect.ValueOf(get_feature), "getStatus": reflect.ValueOf(getStatus)}
  329. serv := rpc.NewServer(res, true, nil)
  330. l, _ := net.Listen("tcp", ":50000")
  331. serv.Listen(l)
  332. go serv.Run()
  333. // Comment out because the bug in the msgpack rpc
  334. // defer serv.Stop()
  335. //Reset
  336. streamList := []string{"helloStr", "commands", "fakeBin"}
  337. topotest.HandleStream(false, streamList, t)
  338. //Data setup
  339. var tests = []topotest.RuleTest{
  340. {
  341. Name: `TestRestRule1`,
  342. Sql: `SELECT helloFromMsgpack(name) as wc FROM helloStr`,
  343. R: [][]map[string]interface{}{
  344. {{
  345. "wc": map[string]interface{}{
  346. "message": "world",
  347. },
  348. }},
  349. {{
  350. "wc": map[string]interface{}{
  351. "message": "golang",
  352. },
  353. }},
  354. {{
  355. "wc": map[string]interface{}{
  356. "message": "peacock",
  357. },
  358. }},
  359. },
  360. M: map[string]interface{}{
  361. "op_2_project_0_exceptions_total": int64(0),
  362. "op_2_project_0_process_latency_us": int64(0),
  363. "op_2_project_0_records_in_total": int64(3),
  364. "op_2_project_0_records_out_total": int64(3),
  365. "sink_mockSink_0_exceptions_total": int64(0),
  366. "sink_mockSink_0_records_in_total": int64(3),
  367. "sink_mockSink_0_records_out_total": int64(3),
  368. },
  369. }, {
  370. Name: `TestRestRule2`,
  371. Sql: `SELECT objectDetectFromMsgpack(*)->result FROM commands`,
  372. R: [][]map[string]interface{}{
  373. {{
  374. "kuiper_field_0": "get success",
  375. }},
  376. {{
  377. "kuiper_field_0": "detect success",
  378. }},
  379. {{
  380. "kuiper_field_0": "delete success",
  381. }},
  382. },
  383. M: map[string]interface{}{
  384. "op_2_project_0_exceptions_total": int64(0),
  385. "op_2_project_0_process_latency_us": int64(0),
  386. "op_2_project_0_records_in_total": int64(3),
  387. "op_2_project_0_records_out_total": int64(3),
  388. "sink_mockSink_0_exceptions_total": int64(0),
  389. "sink_mockSink_0_records_in_total": int64(3),
  390. "sink_mockSink_0_records_out_total": int64(3),
  391. },
  392. }, {
  393. Name: `TestRestRule3`,
  394. Sql: `SELECT getFeatureFromMsgpack(self)->feature[0]->box->h FROM fakeBin`,
  395. R: [][]map[string]interface{}{
  396. {{
  397. "kuiper_field_0": float64(106), //Convert by the testing tool
  398. }},
  399. {{
  400. "kuiper_field_0": float64(107),
  401. }},
  402. {{
  403. "kuiper_field_0": float64(108),
  404. }},
  405. },
  406. M: map[string]interface{}{
  407. "op_2_project_0_exceptions_total": int64(0),
  408. "op_2_project_0_process_latency_us": int64(0),
  409. "op_2_project_0_records_in_total": int64(3),
  410. "op_2_project_0_records_out_total": int64(3),
  411. "sink_mockSink_0_exceptions_total": int64(0),
  412. "sink_mockSink_0_records_in_total": int64(3),
  413. "sink_mockSink_0_records_out_total": int64(3),
  414. },
  415. //}, {
  416. // Name: `TestRestRule4`,
  417. // Sql: `SELECT getStatusFromMsgpack(), command FROM commands`,
  418. // R: [][]map[string]interface{}{
  419. // {{
  420. // "getStatusFromRest": true,
  421. // "command": "get",
  422. // }},
  423. // {{
  424. // "getStatusFromRest": true,
  425. // "command": "detect",
  426. // }},
  427. // {{
  428. // "getStatusFromRest": true,
  429. // "command": "delete",
  430. // }},
  431. // },
  432. // M: map[string]interface{}{
  433. // "op_2_project_0_exceptions_total": int64(0),
  434. // "op_2_project_0_process_latency_us": int64(0),
  435. // "op_2_project_0_records_in_total": int64(3),
  436. // "op_2_project_0_records_out_total": int64(3),
  437. //
  438. // "sink_mockSink_0_exceptions_total": int64(0),
  439. // "sink_mockSink_0_records_in_total": int64(3),
  440. // "sink_mockSink_0_records_out_total": int64(3),
  441. // },
  442. },
  443. }
  444. topotest.HandleStream(true, streamList, t)
  445. topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
  446. BufferLength: 100,
  447. SendError: true,
  448. }, 0)
  449. }
  450. type server struct {
  451. pb.UnimplementedGreeterServer
  452. }
  453. func (s *server) SayHello(_ context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
  454. return &pb.HelloReply{Message: in.GetName()}, nil
  455. }
  456. func (s *server) ObjectDetection(_ context.Context, in *pb.ObjectDetectionRequest) (*pb.ObjectDetectionResponse, error) {
  457. return &pb.ObjectDetectionResponse{
  458. Info: in.Cmd,
  459. Code: 200,
  460. Image: in.Base64Img,
  461. Result: in.Cmd + " success",
  462. Type: "S",
  463. }, nil
  464. }
  465. func (s *server) GetFeature(_ context.Context, v *wrappers.BytesValue) (*pb.FeatureResponse, error) {
  466. l := len(string(v.Value))
  467. return &pb.FeatureResponse{
  468. Feature: []*pb.FeatureResult{
  469. {
  470. Features: []float32{-1.444, 2.55452, 5.121},
  471. Box: &pb.Box{
  472. X: 153,
  473. Y: 107,
  474. W: 174,
  475. H: int32(100 + l),
  476. },
  477. },
  478. {
  479. Features: []float32{1.444, -2.55452, -5.121},
  480. Box: &pb.Box{
  481. X: 257,
  482. Y: 92,
  483. W: 169,
  484. H: 208,
  485. },
  486. },
  487. },
  488. }, nil
  489. }
  490. func (s *server) GetStatus(context.Context, *empty.Empty) (*wrappers.BoolValue, error) {
  491. return &wrappers.BoolValue{Value: true}, nil
  492. }
  493. func TestGrpcService(t *testing.T) {
  494. lis, err := net.Listen("tcp", ":50051")
  495. if err != nil {
  496. common.Log.Fatalf("failed to listen: %v", err)
  497. }
  498. s := grpc.NewServer()
  499. pb.RegisterGreeterServer(s, &server{})
  500. go func() {
  501. if err := s.Serve(lis); err != nil {
  502. common.Log.Fatalf("failed to serve: %v", err)
  503. }
  504. }()
  505. defer s.Stop()
  506. //Reset
  507. streamList := []string{"helloStr", "commands", "fakeBin"}
  508. topotest.HandleStream(false, streamList, t)
  509. //Data setup
  510. var tests = []topotest.RuleTest{
  511. {
  512. Name: `TestRestRule1`,
  513. Sql: `SELECT helloFromGrpc(name) as wc FROM helloStr`,
  514. R: [][]map[string]interface{}{
  515. {{
  516. "wc": map[string]interface{}{
  517. "message": "world",
  518. },
  519. }},
  520. {{
  521. "wc": map[string]interface{}{
  522. "message": "golang",
  523. },
  524. }},
  525. {{
  526. "wc": map[string]interface{}{
  527. "message": "peacock",
  528. },
  529. }},
  530. },
  531. M: map[string]interface{}{
  532. "op_2_project_0_exceptions_total": int64(0),
  533. "op_2_project_0_process_latency_us": int64(0),
  534. "op_2_project_0_records_in_total": int64(3),
  535. "op_2_project_0_records_out_total": int64(3),
  536. "sink_mockSink_0_exceptions_total": int64(0),
  537. "sink_mockSink_0_records_in_total": int64(3),
  538. "sink_mockSink_0_records_out_total": int64(3),
  539. },
  540. }, {
  541. Name: `TestRestRule2`,
  542. Sql: `SELECT objectDetectFromGrpc(cmd, base64_img)->result FROM commands`,
  543. R: [][]map[string]interface{}{
  544. {{
  545. "kuiper_field_0": "get success",
  546. }},
  547. {{
  548. "kuiper_field_0": "detect success",
  549. }},
  550. {{
  551. "kuiper_field_0": "delete success",
  552. }},
  553. },
  554. M: map[string]interface{}{
  555. "op_2_project_0_exceptions_total": int64(0),
  556. "op_2_project_0_process_latency_us": int64(0),
  557. "op_2_project_0_records_in_total": int64(3),
  558. "op_2_project_0_records_out_total": int64(3),
  559. "sink_mockSink_0_exceptions_total": int64(0),
  560. "sink_mockSink_0_records_in_total": int64(3),
  561. "sink_mockSink_0_records_out_total": int64(3),
  562. },
  563. }, {
  564. Name: `TestRestRule3`,
  565. Sql: `SELECT getFeatureFromGrpc(self)->feature[0]->box->h FROM fakeBin`,
  566. R: [][]map[string]interface{}{
  567. {{
  568. "kuiper_field_0": float64(106), //Convert by the testing tool
  569. }},
  570. {{
  571. "kuiper_field_0": float64(107),
  572. }},
  573. {{
  574. "kuiper_field_0": float64(108),
  575. }},
  576. },
  577. M: map[string]interface{}{
  578. "op_2_project_0_exceptions_total": int64(0),
  579. "op_2_project_0_process_latency_us": int64(0),
  580. "op_2_project_0_records_in_total": int64(3),
  581. "op_2_project_0_records_out_total": int64(3),
  582. "sink_mockSink_0_exceptions_total": int64(0),
  583. "sink_mockSink_0_records_in_total": int64(3),
  584. "sink_mockSink_0_records_out_total": int64(3),
  585. },
  586. }, {
  587. Name: `TestRestRule4`,
  588. Sql: `SELECT getStatusFromGrpc(), cmd FROM commands`,
  589. R: [][]map[string]interface{}{
  590. {{
  591. "getStatusFromGrpc": true,
  592. "cmd": "get",
  593. }},
  594. {{
  595. "getStatusFromGrpc": true,
  596. "cmd": "detect",
  597. }},
  598. {{
  599. "getStatusFromGrpc": true,
  600. "cmd": "delete",
  601. }},
  602. },
  603. M: map[string]interface{}{
  604. "op_2_project_0_exceptions_total": int64(0),
  605. "op_2_project_0_process_latency_us": int64(0),
  606. "op_2_project_0_records_in_total": int64(3),
  607. "op_2_project_0_records_out_total": int64(3),
  608. "sink_mockSink_0_exceptions_total": int64(0),
  609. "sink_mockSink_0_records_in_total": int64(3),
  610. "sink_mockSink_0_records_out_total": int64(3),
  611. },
  612. },
  613. }
  614. topotest.HandleStream(true, streamList, t)
  615. topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
  616. BufferLength: 100,
  617. SendError: true,
  618. }, 0)
  619. }