external_service_rule_test.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/golang/protobuf/ptypes/empty"
  7. "github.com/golang/protobuf/ptypes/wrappers"
  8. "github.com/gorilla/mux"
  9. kconf "github.com/lf-edge/ekuiper/internal/conf"
  10. pb "github.com/lf-edge/ekuiper/internal/service/test/schemas/helloworld"
  11. "github.com/lf-edge/ekuiper/internal/topo/topotest"
  12. "github.com/lf-edge/ekuiper/pkg/api"
  13. "github.com/msgpack-rpc/msgpack-rpc-go/rpc"
  14. "google.golang.org/grpc"
  15. "io"
  16. "net"
  17. "net/http"
  18. "net/http/httptest"
  19. "reflect"
  20. "strconv"
  21. "testing"
  22. )
  23. type HelloRequest struct {
  24. Name string `json:"name,omitempty"`
  25. }
  26. type HelloReply struct {
  27. Message string `json:"message,omitempty"`
  28. }
  29. type ObjectDetectRequest struct {
  30. Command string `json:"cmd,omitempty"`
  31. Image string `json:"base64_img,omitempty"`
  32. }
  33. type ObjectDetectResponse struct {
  34. Info string `json:"cmd,omitempty"`
  35. Code int `json:"base64_img,omitempty"`
  36. Image string `json:"image,omitempty"`
  37. Result string `json:"result,omitempty"`
  38. Type string `json:"type,omitempty"`
  39. }
  40. //type Box struct {
  41. // X int32 `json:"x,omitempty"`
  42. // Y int32 `json:"y,omitempty"`
  43. // W int32 `json:"w,omitempty"`
  44. // H int32 `json:"h,omitempty"`
  45. //}
  46. //type FeatureResult struct {
  47. // Features []float64 `json:"features,omitempty"`
  48. // Box Box `json:"box,omitempty"`
  49. //}
  50. type EncodedRequest struct {
  51. Name string `json:"name,omitempty"`
  52. Size int `json:"size,omitempty"`
  53. }
  54. type ShelfMessage struct {
  55. Id string `json:"id,omitempty"`
  56. Theme string `json:"theme,omitempty"`
  57. }
  58. type ShelfMessageOut struct {
  59. Id int64 `json:"id,omitempty"`
  60. Theme string `json:"theme,omitempty"`
  61. }
  62. type BookMessage struct {
  63. Id int64 `json:"id,omitempty"`
  64. Author string `json:"author,omitempty"`
  65. Title string `json:"title,omitempty"`
  66. }
  67. type MessageMessage struct {
  68. Text string `json:"text,omitempty"`
  69. }
  70. func TestRestService(t *testing.T) {
  71. // mock server, the port is set in the sample.json
  72. l, err := net.Listen("tcp", "127.0.0.1:51234")
  73. if err != nil {
  74. t.Error(err)
  75. t.FailNow()
  76. }
  77. count := 0
  78. router := mux.NewRouter()
  79. router.HandleFunc("/SayHello", func(w http.ResponseWriter, r *http.Request) {
  80. body := &HelloRequest{}
  81. err := json.NewDecoder(r.Body).Decode(body)
  82. if err != nil {
  83. http.Error(w, err.Error(), http.StatusBadRequest)
  84. }
  85. out := &HelloReply{Message: body.Name}
  86. jsonOut(w, err, out)
  87. }).Methods(http.MethodPost)
  88. router.HandleFunc("/object_detection", func(w http.ResponseWriter, r *http.Request) {
  89. req := &ObjectDetectRequest{}
  90. err := json.NewDecoder(r.Body).Decode(req)
  91. if err != nil {
  92. http.Error(w, err.Error(), http.StatusBadRequest)
  93. }
  94. if req.Image == "" {
  95. http.Error(w, "image is not found", http.StatusBadRequest)
  96. }
  97. out := &ObjectDetectResponse{
  98. Info: req.Command,
  99. Code: 200,
  100. Image: req.Image,
  101. Result: req.Command + " success",
  102. Type: "S",
  103. }
  104. jsonOut(w, err, out)
  105. }).Methods(http.MethodPost)
  106. router.HandleFunc("/getStatus", func(w http.ResponseWriter, r *http.Request) {
  107. result := count%2 == 0
  108. count++
  109. io.WriteString(w, fmt.Sprintf("%v", result))
  110. }).Methods(http.MethodPost)
  111. router.HandleFunc("/RestEncodedJson", func(w http.ResponseWriter, r *http.Request) {
  112. req := &EncodedRequest{}
  113. err := json.NewDecoder(r.Body).Decode(req)
  114. if err != nil {
  115. http.Error(w, err.Error(), http.StatusBadRequest)
  116. }
  117. io.WriteString(w, req.Name)
  118. }).Methods(http.MethodPost)
  119. router.HandleFunc("/bookshelf/v1/shelves", func(w http.ResponseWriter, r *http.Request) {
  120. req := &ShelfMessage{}
  121. err := json.NewDecoder(r.Body).Decode(req)
  122. if err != nil {
  123. http.Error(w, err.Error(), http.StatusBadRequest)
  124. }
  125. if req.Id == "" || req.Theme == "" {
  126. http.Error(w, "empty request", http.StatusBadRequest)
  127. }
  128. idint, _ := strconv.Atoi(req.Id)
  129. out := ShelfMessageOut{Id: int64(idint), Theme: req.Theme}
  130. jsonOut(w, err, out)
  131. }).Methods(http.MethodPost)
  132. router.HandleFunc("/bookshelf/v1/shelves/{shelf}/books/{book}", func(w http.ResponseWriter, r *http.Request) {
  133. defer r.Body.Close()
  134. vars := mux.Vars(r)
  135. shelf, book := vars["shelf"], vars["book"]
  136. if shelf == "" || book == "" {
  137. http.Error(w, "empty request", http.StatusBadRequest)
  138. }
  139. idint, _ := strconv.Atoi(book)
  140. out := BookMessage{Id: int64(idint), Author: "NA", Title: "title_" + book}
  141. jsonOut(w, err, out)
  142. }).Methods(http.MethodGet)
  143. router.HandleFunc("/messaging/v1/messages/{name}", func(w http.ResponseWriter, r *http.Request) {
  144. defer r.Body.Close()
  145. vars := mux.Vars(r)
  146. name := vars["name"]
  147. if name == "" {
  148. http.Error(w, "empty request", http.StatusBadRequest)
  149. }
  150. out := MessageMessage{Text: name + " content"}
  151. jsonOut(w, err, out)
  152. }).Methods(http.MethodGet)
  153. router.HandleFunc("/messaging/v1/messages/filter/{name}", func(w http.ResponseWriter, r *http.Request) {
  154. defer r.Body.Close()
  155. vars := mux.Vars(r)
  156. name := vars["name"]
  157. q := r.URL.Query()
  158. rev, sub := q.Get("revision"), q.Get("sub.subfield")
  159. if name == "" || rev == "" || sub == "" {
  160. http.Error(w, "empty request", http.StatusBadRequest)
  161. }
  162. out := MessageMessage{Text: name + rev + sub}
  163. jsonOut(w, err, out)
  164. }).Methods(http.MethodGet)
  165. router.HandleFunc("/messaging/v1/messages/{name}", func(w http.ResponseWriter, r *http.Request) {
  166. defer r.Body.Close()
  167. vars := mux.Vars(r)
  168. name := vars["name"]
  169. if name == "" {
  170. http.Error(w, "empty request", http.StatusBadRequest)
  171. }
  172. body := &MessageMessage{}
  173. err := json.NewDecoder(r.Body).Decode(body)
  174. if err != nil {
  175. http.Error(w, err.Error(), http.StatusBadRequest)
  176. }
  177. out := MessageMessage{Text: body.Text}
  178. jsonOut(w, err, out)
  179. }).Methods(http.MethodPut, http.MethodPatch)
  180. server := httptest.NewUnstartedServer(router)
  181. server.Listener.Close()
  182. server.Listener = l
  183. // Start the server.
  184. server.Start()
  185. defer server.Close()
  186. //Reset
  187. streamList := []string{"helloStr", "commands", "fakeBin", "shelves", "demo", "mes"}
  188. topotest.HandleStream(false, streamList, t)
  189. //Data setup
  190. var tests = []topotest.RuleTest{
  191. {
  192. Name: `TestRestRule1`,
  193. Sql: `SELECT helloFromRest(name) as wc FROM helloStr`,
  194. R: [][]map[string]interface{}{
  195. {{
  196. "wc": map[string]interface{}{
  197. "message": "world",
  198. },
  199. }},
  200. {{
  201. "wc": map[string]interface{}{
  202. "message": "golang",
  203. },
  204. }},
  205. {{
  206. "wc": map[string]interface{}{
  207. "message": "peacock",
  208. },
  209. }},
  210. },
  211. M: map[string]interface{}{
  212. "op_2_project_0_exceptions_total": int64(0),
  213. "op_2_project_0_process_latency_us": int64(0),
  214. "op_2_project_0_records_in_total": int64(3),
  215. "op_2_project_0_records_out_total": int64(3),
  216. "sink_mockSink_0_exceptions_total": int64(0),
  217. "sink_mockSink_0_records_in_total": int64(3),
  218. "sink_mockSink_0_records_out_total": int64(3),
  219. },
  220. }, {
  221. Name: `TestRestRule2`,
  222. Sql: `SELECT objectDetectFromRest(cmd, base64_img)->result FROM commands`,
  223. R: [][]map[string]interface{}{
  224. {{
  225. "kuiper_field_0": "get success",
  226. }},
  227. {{
  228. "kuiper_field_0": "detect success",
  229. }},
  230. {{
  231. "kuiper_field_0": "delete success",
  232. }},
  233. },
  234. M: map[string]interface{}{
  235. "op_2_project_0_exceptions_total": int64(0),
  236. "op_2_project_0_process_latency_us": int64(0),
  237. "op_2_project_0_records_in_total": int64(3),
  238. "op_2_project_0_records_out_total": int64(3),
  239. "sink_mockSink_0_exceptions_total": int64(0),
  240. "sink_mockSink_0_records_in_total": int64(3),
  241. "sink_mockSink_0_records_out_total": int64(3),
  242. },
  243. }, {
  244. Name: `TestRestRule3`,
  245. Sql: `SELECT objectDetectFromRest(*)->result FROM commands`,
  246. R: [][]map[string]interface{}{
  247. {{
  248. "kuiper_field_0": "get success",
  249. }},
  250. {{
  251. "kuiper_field_0": "detect success",
  252. }},
  253. {{
  254. "kuiper_field_0": "delete success",
  255. }},
  256. },
  257. M: map[string]interface{}{
  258. "op_2_project_0_exceptions_total": int64(0),
  259. "op_2_project_0_process_latency_us": int64(0),
  260. "op_2_project_0_records_in_total": int64(3),
  261. "op_2_project_0_records_out_total": int64(3),
  262. "sink_mockSink_0_exceptions_total": int64(0),
  263. "sink_mockSink_0_records_in_total": int64(3),
  264. "sink_mockSink_0_records_out_total": int64(3),
  265. },
  266. //}, {
  267. // Name: `TestRestRule3`,
  268. // Sql: `SELECT getFeatureFromRest(self)->feature[0]->box->h FROM fakeBin`,
  269. // R: [][]map[string]interface{}{
  270. // {{
  271. // "kuiper_field_0": 106,
  272. // }},
  273. // {{
  274. // "kuiper_field_0": 107,
  275. // }},
  276. // {{
  277. // "kuiper_field_0": 108,
  278. // }},
  279. // },
  280. // M: map[string]interface{}{
  281. // "op_2_project_0_exceptions_total": int64(0),
  282. // "op_2_project_0_process_latency_us": int64(0),
  283. // "op_2_project_0_records_in_total": int64(3),
  284. // "op_2_project_0_records_out_total": int64(3),
  285. //
  286. // "sink_mockSink_0_exceptions_total": int64(0),
  287. // "sink_mockSink_0_records_in_total": int64(3),
  288. // "sink_mockSink_0_records_out_total": int64(3),
  289. // },
  290. }, {
  291. Name: `TestRestRule4`,
  292. Sql: `SELECT getStatusFromRest(), cmd FROM commands`,
  293. R: [][]map[string]interface{}{
  294. {{
  295. "getStatusFromRest": true,
  296. "cmd": "get",
  297. }},
  298. {{
  299. "getStatusFromRest": false,
  300. "cmd": "detect",
  301. }},
  302. {{
  303. "getStatusFromRest": true,
  304. "cmd": "delete",
  305. }},
  306. },
  307. M: map[string]interface{}{
  308. "op_2_project_0_exceptions_total": int64(0),
  309. "op_2_project_0_process_latency_us": int64(0),
  310. "op_2_project_0_records_in_total": int64(3),
  311. "op_2_project_0_records_out_total": int64(3),
  312. "sink_mockSink_0_exceptions_total": int64(0),
  313. "sink_mockSink_0_records_in_total": int64(3),
  314. "sink_mockSink_0_records_out_total": int64(3),
  315. },
  316. }, {
  317. Name: `TestRestRule5`,
  318. Sql: `SELECT restEncodedJson(encoded_json) as name FROM commands`,
  319. R: [][]map[string]interface{}{
  320. {{
  321. "name": "name1",
  322. }},
  323. {{
  324. "name": "name2",
  325. }},
  326. {{
  327. "name": "name3",
  328. }},
  329. },
  330. M: map[string]interface{}{
  331. "op_2_project_0_exceptions_total": int64(0),
  332. "op_2_project_0_process_latency_us": int64(0),
  333. "op_2_project_0_records_in_total": int64(3),
  334. "op_2_project_0_records_out_total": int64(3),
  335. "sink_mockSink_0_exceptions_total": int64(0),
  336. "sink_mockSink_0_records_in_total": int64(3),
  337. "sink_mockSink_0_records_out_total": int64(3),
  338. },
  339. }, {
  340. Name: `TestRestRule6`,
  341. Sql: `SELECT CreateShelf(shelf)->theme as theme FROM shelves`,
  342. R: [][]map[string]interface{}{
  343. {{
  344. "theme": "tandra",
  345. }},
  346. {{
  347. "theme": "claro",
  348. }},
  349. {{
  350. "theme": "dark",
  351. }},
  352. },
  353. M: map[string]interface{}{
  354. "op_2_project_0_exceptions_total": int64(0),
  355. "op_2_project_0_process_latency_us": int64(0),
  356. "op_2_project_0_records_in_total": int64(3),
  357. "op_2_project_0_records_out_total": int64(3),
  358. "sink_mockSink_0_exceptions_total": int64(0),
  359. "sink_mockSink_0_records_in_total": int64(3),
  360. "sink_mockSink_0_records_out_total": int64(3),
  361. },
  362. }, {
  363. Name: `TestRestRule7`,
  364. Sql: `SELECT GetBook(size, ts)->title as title FROM demo WHERE size > 3 `,
  365. R: [][]map[string]interface{}{
  366. {{
  367. "title": "title_1541152486822",
  368. }},
  369. {{
  370. "title": "title_1541152488442",
  371. }},
  372. },
  373. M: map[string]interface{}{
  374. "op_2_filter_0_exceptions_total": int64(0),
  375. "op_2_filter_0_process_latency_us": int64(0),
  376. "op_2_filter_0_records_in_total": int64(5),
  377. "op_2_filter_0_records_out_total": int64(2),
  378. "sink_mockSink_0_exceptions_total": int64(0),
  379. "sink_mockSink_0_records_in_total": int64(2),
  380. "sink_mockSink_0_records_out_total": int64(2),
  381. },
  382. }, {
  383. Name: `TestRestRule8`,
  384. Sql: `SELECT GetMessage(concat("messages/",ts))->text as message FROM demo WHERE size > 3`,
  385. R: [][]map[string]interface{}{
  386. {{
  387. "message": "1541152486822 content",
  388. }},
  389. {{
  390. "message": "1541152488442 content",
  391. }},
  392. },
  393. M: map[string]interface{}{
  394. "op_2_filter_0_exceptions_total": int64(0),
  395. "op_2_filter_0_process_latency_us": int64(0),
  396. "op_2_filter_0_records_in_total": int64(5),
  397. "op_2_filter_0_records_out_total": int64(2),
  398. "sink_mockSink_0_exceptions_total": int64(0),
  399. "sink_mockSink_0_records_in_total": int64(2),
  400. "sink_mockSink_0_records_out_total": int64(2),
  401. },
  402. }, {
  403. Name: `TestRestRule9`,
  404. Sql: `SELECT SearchMessage(name, size, shelf)->text as message FROM shelves`,
  405. R: [][]map[string]interface{}{
  406. {{
  407. "message": "name12sub1",
  408. }},
  409. {{
  410. "message": "name23sub2",
  411. }},
  412. {{
  413. "message": "name34sub3",
  414. }},
  415. },
  416. M: map[string]interface{}{
  417. "op_2_project_0_exceptions_total": int64(0),
  418. "op_2_project_0_process_latency_us": int64(0),
  419. "op_2_project_0_records_in_total": int64(3),
  420. "op_2_project_0_records_out_total": int64(3),
  421. "sink_mockSink_0_exceptions_total": int64(0),
  422. "sink_mockSink_0_records_in_total": int64(3),
  423. "sink_mockSink_0_records_out_total": int64(3),
  424. },
  425. // TODO support * as one of the parameters
  426. //},{
  427. // Name: `TestRestRule10`,
  428. // Sql: `SELECT UpdateMessage(message_id, *)->text as message FROM mes`,
  429. // R: [][]map[string]interface{}{
  430. // {{
  431. // "message": "message1",
  432. // }},
  433. // {{
  434. // "message": "message2",
  435. // }},
  436. // {{
  437. // "message": "message3",
  438. // }},
  439. // },
  440. // M: map[string]interface{}{
  441. // "op_2_project_0_exceptions_total": int64(0),
  442. // "op_2_project_0_process_latency_us": int64(0),
  443. // "op_2_project_0_records_in_total": int64(3),
  444. // "op_2_project_0_records_out_total": int64(3),
  445. //
  446. // "sink_mockSink_0_exceptions_total": int64(0),
  447. // "sink_mockSink_0_records_in_total": int64(3),
  448. // "sink_mockSink_0_records_out_total": int64(3),
  449. // },
  450. }, {
  451. Name: `TestRestRule11`,
  452. Sql: `SELECT PatchMessage(message_id, text)->text as message FROM mes`,
  453. R: [][]map[string]interface{}{
  454. {{
  455. "message": "message1",
  456. }},
  457. {{
  458. "message": "message2",
  459. }},
  460. {{
  461. "message": "message3",
  462. }},
  463. },
  464. M: map[string]interface{}{
  465. "op_2_project_0_exceptions_total": int64(0),
  466. "op_2_project_0_process_latency_us": int64(0),
  467. "op_2_project_0_records_in_total": int64(3),
  468. "op_2_project_0_records_out_total": int64(3),
  469. "sink_mockSink_0_exceptions_total": int64(0),
  470. "sink_mockSink_0_records_in_total": int64(3),
  471. "sink_mockSink_0_records_out_total": int64(3),
  472. },
  473. },
  474. }
  475. topotest.HandleStream(true, streamList, t)
  476. topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
  477. BufferLength: 100,
  478. SendError: true,
  479. }, 0)
  480. }
  481. func jsonOut(w http.ResponseWriter, err error, out interface{}) {
  482. w.Header().Add("Content-Type", "application/json")
  483. enc := json.NewEncoder(w)
  484. err = enc.Encode(out)
  485. // Problems encoding
  486. if err != nil {
  487. http.Error(w, err.Error(), http.StatusBadRequest)
  488. }
  489. }
  490. type Resolver map[string]reflect.Value
  491. func (self Resolver) Resolve(name string, _ []reflect.Value) (reflect.Value, error) {
  492. return self[name], nil
  493. }
  494. func SayHello(name string) map[string]interface{} {
  495. return map[string]interface{}{
  496. "message": name,
  497. }
  498. }
  499. func get_feature(img []byte) map[string]interface{} {
  500. l := len(string(img))
  501. return map[string]interface{}{
  502. "feature": []map[string]interface{}{
  503. {
  504. "features": []float64{-1.444, 2.55452, 5.121},
  505. "box": map[string]interface{}{
  506. "x": 153,
  507. "y": 107,
  508. "w": 174,
  509. "h": 100 + l,
  510. },
  511. }, {
  512. "features": []float64{1.444, -2.55452, -5.121},
  513. "box": map[string]interface{}{
  514. "x": 257,
  515. "y": 92,
  516. "w": 169,
  517. "h": 208,
  518. },
  519. },
  520. },
  521. }
  522. }
  523. func object_detection(command string, image string) map[string]interface{} {
  524. out := map[string]interface{}{
  525. "info": command,
  526. "code": 200,
  527. "image": image,
  528. "result": command + " success",
  529. "type": "S",
  530. }
  531. return out
  532. }
  533. func getStatus() bool {
  534. return true
  535. }
  536. func TestMsgpackService(t *testing.T) {
  537. // mock server
  538. res := Resolver{"SayHello": reflect.ValueOf(SayHello), "object_detection": reflect.ValueOf(object_detection), "get_feature": reflect.ValueOf(get_feature), "getStatus": reflect.ValueOf(getStatus)}
  539. serv := rpc.NewServer(res, true, nil)
  540. l, _ := net.Listen("tcp", ":50000")
  541. serv.Listen(l)
  542. go serv.Run()
  543. // Comment out because the bug in the msgpack rpc
  544. // defer serv.Stop()
  545. //Reset
  546. streamList := []string{"helloStr", "commands", "fakeBin"}
  547. topotest.HandleStream(false, streamList, t)
  548. //Data setup
  549. var tests = []topotest.RuleTest{
  550. {
  551. Name: `TestRestRule1`,
  552. Sql: `SELECT helloFromMsgpack(name) as wc FROM helloStr`,
  553. R: [][]map[string]interface{}{
  554. {{
  555. "wc": map[string]interface{}{
  556. "message": "world",
  557. },
  558. }},
  559. {{
  560. "wc": map[string]interface{}{
  561. "message": "golang",
  562. },
  563. }},
  564. {{
  565. "wc": map[string]interface{}{
  566. "message": "peacock",
  567. },
  568. }},
  569. },
  570. M: map[string]interface{}{
  571. "op_2_project_0_exceptions_total": int64(0),
  572. "op_2_project_0_process_latency_us": int64(0),
  573. "op_2_project_0_records_in_total": int64(3),
  574. "op_2_project_0_records_out_total": int64(3),
  575. "sink_mockSink_0_exceptions_total": int64(0),
  576. "sink_mockSink_0_records_in_total": int64(3),
  577. "sink_mockSink_0_records_out_total": int64(3),
  578. },
  579. }, {
  580. Name: `TestRestRule2`,
  581. Sql: `SELECT objectDetectFromMsgpack(*)->result FROM commands`,
  582. R: [][]map[string]interface{}{
  583. {{
  584. "kuiper_field_0": "get success",
  585. }},
  586. {{
  587. "kuiper_field_0": "detect success",
  588. }},
  589. {{
  590. "kuiper_field_0": "delete success",
  591. }},
  592. },
  593. M: map[string]interface{}{
  594. "op_2_project_0_exceptions_total": int64(0),
  595. "op_2_project_0_process_latency_us": int64(0),
  596. "op_2_project_0_records_in_total": int64(3),
  597. "op_2_project_0_records_out_total": int64(3),
  598. "sink_mockSink_0_exceptions_total": int64(0),
  599. "sink_mockSink_0_records_in_total": int64(3),
  600. "sink_mockSink_0_records_out_total": int64(3),
  601. },
  602. }, {
  603. Name: `TestRestRule3`,
  604. Sql: `SELECT getFeatureFromMsgpack(self)->feature[0]->box->h FROM fakeBin`,
  605. R: [][]map[string]interface{}{
  606. {{
  607. "kuiper_field_0": float64(106), //Convert by the testing tool
  608. }},
  609. {{
  610. "kuiper_field_0": float64(107),
  611. }},
  612. {{
  613. "kuiper_field_0": float64(108),
  614. }},
  615. },
  616. M: map[string]interface{}{
  617. "op_2_project_0_exceptions_total": int64(0),
  618. "op_2_project_0_process_latency_us": int64(0),
  619. "op_2_project_0_records_in_total": int64(3),
  620. "op_2_project_0_records_out_total": int64(3),
  621. "sink_mockSink_0_exceptions_total": int64(0),
  622. "sink_mockSink_0_records_in_total": int64(3),
  623. "sink_mockSink_0_records_out_total": int64(3),
  624. },
  625. //}, {
  626. // Name: `TestRestRule4`,
  627. // Sql: `SELECT getStatusFromMsgpack(), command FROM commands`,
  628. // R: [][]map[string]interface{}{
  629. // {{
  630. // "getStatusFromRest": true,
  631. // "command": "get",
  632. // }},
  633. // {{
  634. // "getStatusFromRest": true,
  635. // "command": "detect",
  636. // }},
  637. // {{
  638. // "getStatusFromRest": true,
  639. // "command": "delete",
  640. // }},
  641. // },
  642. // M: map[string]interface{}{
  643. // "op_2_project_0_exceptions_total": int64(0),
  644. // "op_2_project_0_process_latency_us": int64(0),
  645. // "op_2_project_0_records_in_total": int64(3),
  646. // "op_2_project_0_records_out_total": int64(3),
  647. //
  648. // "sink_mockSink_0_exceptions_total": int64(0),
  649. // "sink_mockSink_0_records_in_total": int64(3),
  650. // "sink_mockSink_0_records_out_total": int64(3),
  651. // },
  652. },
  653. }
  654. topotest.HandleStream(true, streamList, t)
  655. topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
  656. BufferLength: 100,
  657. SendError: true,
  658. }, 0)
  659. }
  660. type server struct {
  661. pb.UnimplementedGreeterServer
  662. }
  663. func (s *server) SayHello(_ context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
  664. return &pb.HelloReply{Message: in.GetName()}, nil
  665. }
  666. func (s *server) ObjectDetection(_ context.Context, in *pb.ObjectDetectionRequest) (*pb.ObjectDetectionResponse, error) {
  667. return &pb.ObjectDetectionResponse{
  668. Info: in.Cmd,
  669. Code: 200,
  670. Image: in.Base64Img,
  671. Result: in.Cmd + " success",
  672. Type: "S",
  673. }, nil
  674. }
  675. func (s *server) GetFeature(_ context.Context, v *wrappers.BytesValue) (*pb.FeatureResponse, error) {
  676. l := len(string(v.Value))
  677. return &pb.FeatureResponse{
  678. Feature: []*pb.FeatureResult{
  679. {
  680. Features: []float32{-1.444, 2.55452, 5.121},
  681. Box: &pb.Box{
  682. X: 153,
  683. Y: 107,
  684. W: 174,
  685. H: int32(100 + l),
  686. },
  687. },
  688. {
  689. Features: []float32{1.444, -2.55452, -5.121},
  690. Box: &pb.Box{
  691. X: 257,
  692. Y: 92,
  693. W: 169,
  694. H: 208,
  695. },
  696. },
  697. },
  698. }, nil
  699. }
  700. func (s *server) GetStatus(context.Context, *empty.Empty) (*wrappers.BoolValue, error) {
  701. return &wrappers.BoolValue{Value: true}, nil
  702. }
  703. func TestGrpcService(t *testing.T) {
  704. lis, err := net.Listen("tcp", ":50051")
  705. if err != nil {
  706. kconf.Log.Fatalf("failed to listen: %v", err)
  707. }
  708. s := grpc.NewServer()
  709. pb.RegisterGreeterServer(s, &server{})
  710. go func() {
  711. if err := s.Serve(lis); err != nil {
  712. kconf.Log.Fatalf("failed to serve: %v", err)
  713. }
  714. }()
  715. defer s.Stop()
  716. //Reset
  717. streamList := []string{"helloStr", "commands", "fakeBin"}
  718. topotest.HandleStream(false, streamList, t)
  719. //Data setup
  720. var tests = []topotest.RuleTest{
  721. {
  722. Name: `TestRestRule1`,
  723. Sql: `SELECT helloFromGrpc(name) as wc FROM helloStr`,
  724. R: [][]map[string]interface{}{
  725. {{
  726. "wc": map[string]interface{}{
  727. "message": "world",
  728. },
  729. }},
  730. {{
  731. "wc": map[string]interface{}{
  732. "message": "golang",
  733. },
  734. }},
  735. {{
  736. "wc": map[string]interface{}{
  737. "message": "peacock",
  738. },
  739. }},
  740. },
  741. M: map[string]interface{}{
  742. "op_2_project_0_exceptions_total": int64(0),
  743. "op_2_project_0_process_latency_us": int64(0),
  744. "op_2_project_0_records_in_total": int64(3),
  745. "op_2_project_0_records_out_total": int64(3),
  746. "sink_mockSink_0_exceptions_total": int64(0),
  747. "sink_mockSink_0_records_in_total": int64(3),
  748. "sink_mockSink_0_records_out_total": int64(3),
  749. },
  750. }, {
  751. Name: `TestRestRule2`,
  752. Sql: `SELECT objectDetectFromGrpc(cmd, base64_img)->result FROM commands`,
  753. R: [][]map[string]interface{}{
  754. {{
  755. "kuiper_field_0": "get success",
  756. }},
  757. {{
  758. "kuiper_field_0": "detect success",
  759. }},
  760. {{
  761. "kuiper_field_0": "delete success",
  762. }},
  763. },
  764. M: map[string]interface{}{
  765. "op_2_project_0_exceptions_total": int64(0),
  766. "op_2_project_0_process_latency_us": int64(0),
  767. "op_2_project_0_records_in_total": int64(3),
  768. "op_2_project_0_records_out_total": int64(3),
  769. "sink_mockSink_0_exceptions_total": int64(0),
  770. "sink_mockSink_0_records_in_total": int64(3),
  771. "sink_mockSink_0_records_out_total": int64(3),
  772. },
  773. }, {
  774. Name: `TestRestRule3`,
  775. Sql: `SELECT getFeatureFromGrpc(self)->feature[0]->box->h FROM fakeBin`,
  776. R: [][]map[string]interface{}{
  777. {{
  778. "kuiper_field_0": float64(106), //Convert by the testing tool
  779. }},
  780. {{
  781. "kuiper_field_0": float64(107),
  782. }},
  783. {{
  784. "kuiper_field_0": float64(108),
  785. }},
  786. },
  787. M: map[string]interface{}{
  788. "op_2_project_0_exceptions_total": int64(0),
  789. "op_2_project_0_process_latency_us": int64(0),
  790. "op_2_project_0_records_in_total": int64(3),
  791. "op_2_project_0_records_out_total": int64(3),
  792. "sink_mockSink_0_exceptions_total": int64(0),
  793. "sink_mockSink_0_records_in_total": int64(3),
  794. "sink_mockSink_0_records_out_total": int64(3),
  795. },
  796. }, {
  797. Name: `TestRestRule4`,
  798. Sql: `SELECT getStatusFromGrpc(), cmd FROM commands`,
  799. R: [][]map[string]interface{}{
  800. {{
  801. "getStatusFromGrpc": true,
  802. "cmd": "get",
  803. }},
  804. {{
  805. "getStatusFromGrpc": true,
  806. "cmd": "detect",
  807. }},
  808. {{
  809. "getStatusFromGrpc": true,
  810. "cmd": "delete",
  811. }},
  812. },
  813. M: map[string]interface{}{
  814. "op_2_project_0_exceptions_total": int64(0),
  815. "op_2_project_0_process_latency_us": int64(0),
  816. "op_2_project_0_records_in_total": int64(3),
  817. "op_2_project_0_records_out_total": int64(3),
  818. "sink_mockSink_0_exceptions_total": int64(0),
  819. "sink_mockSink_0_records_in_total": int64(3),
  820. "sink_mockSink_0_records_out_total": int64(3),
  821. },
  822. },
  823. }
  824. topotest.HandleStream(true, streamList, t)
  825. topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
  826. BufferLength: 100,
  827. SendError: true,
  828. }, 0)
  829. }