external_service_rule_test.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. "io"
  20. "net"
  21. "net/http"
  22. "net/http/httptest"
  23. "reflect"
  24. "strconv"
  25. "testing"
  26. "github.com/golang/protobuf/ptypes/empty"
  27. "github.com/golang/protobuf/ptypes/wrappers"
  28. "github.com/gorilla/mux"
  29. "google.golang.org/grpc"
  30. kconf "github.com/lf-edge/ekuiper/internal/conf"
  31. "github.com/lf-edge/ekuiper/internal/topo/topotest"
  32. "github.com/lf-edge/ekuiper/pkg/api"
  33. )
  34. type RestHelloRequest struct {
  35. Name string `json:"name,omitempty"`
  36. }
  37. type RestHelloReply struct {
  38. Message string `json:"message,omitempty"`
  39. }
  40. type ObjectDetectRequest struct {
  41. Command string `json:"cmd,omitempty"`
  42. Image string `json:"base64_img,omitempty"`
  43. }
  44. type ObjectDetectResponse struct {
  45. Info string `json:"cmd,omitempty"`
  46. Code int `json:"base64_img,omitempty"`
  47. Image string `json:"image,omitempty"`
  48. Result string `json:"result,omitempty"`
  49. Type string `json:"type,omitempty"`
  50. }
  51. //type Box struct {
  52. // X int32 `json:"x,omitempty"`
  53. // Y int32 `json:"y,omitempty"`
  54. // W int32 `json:"w,omitempty"`
  55. // H int32 `json:"h,omitempty"`
  56. //}
  57. //type FeatureResult struct {
  58. // Features []float64 `json:"features,omitempty"`
  59. // Box Box `json:"box,omitempty"`
  60. //}
  61. type EncodedRequest struct {
  62. Name string `json:"name,omitempty"`
  63. Size int `json:"size,omitempty"`
  64. }
  65. type ShelfMessage struct {
  66. Id string `json:"id,omitempty"`
  67. Theme string `json:"theme,omitempty"`
  68. }
  69. type ShelfMessageOut struct {
  70. Id int64 `json:"id,omitempty"`
  71. Theme string `json:"theme,omitempty"`
  72. }
  73. type BookMessage struct {
  74. Id int64 `json:"id,omitempty"`
  75. Author string `json:"author,omitempty"`
  76. Title string `json:"title,omitempty"`
  77. }
  78. type MessageMessage struct {
  79. Text string `json:"text,omitempty"`
  80. }
  81. func TestRestService(t *testing.T) {
  82. // mock server, the port is set in the sample.json
  83. l, err := net.Listen("tcp", "127.0.0.1:51234")
  84. if err != nil {
  85. t.Error(err)
  86. t.FailNow()
  87. }
  88. count := 0
  89. router := mux.NewRouter()
  90. router.HandleFunc("/SayHello", func(w http.ResponseWriter, r *http.Request) {
  91. body := &RestHelloRequest{}
  92. err := json.NewDecoder(r.Body).Decode(body)
  93. if err != nil {
  94. http.Error(w, err.Error(), http.StatusBadRequest)
  95. }
  96. out := &RestHelloReply{Message: body.Name}
  97. jsonOut(w, out)
  98. }).Methods(http.MethodPost)
  99. router.HandleFunc("/object_detection", func(w http.ResponseWriter, r *http.Request) {
  100. req := &ObjectDetectRequest{}
  101. err := json.NewDecoder(r.Body).Decode(req)
  102. if err != nil {
  103. http.Error(w, err.Error(), http.StatusBadRequest)
  104. }
  105. if req.Image == "" {
  106. http.Error(w, "image is not found", http.StatusBadRequest)
  107. }
  108. out := &ObjectDetectResponse{
  109. Info: req.Command,
  110. Code: 200,
  111. Image: req.Image,
  112. Result: req.Command + " success",
  113. Type: "S",
  114. }
  115. jsonOut(w, out)
  116. }).Methods(http.MethodPost)
  117. router.HandleFunc("/getStatus", func(w http.ResponseWriter, r *http.Request) {
  118. result := count%2 == 0
  119. count++
  120. io.WriteString(w, fmt.Sprintf("%v", result))
  121. }).Methods(http.MethodPost)
  122. router.HandleFunc("/RestEncodedJson", func(w http.ResponseWriter, r *http.Request) {
  123. req := &EncodedRequest{}
  124. err := json.NewDecoder(r.Body).Decode(req)
  125. if err != nil {
  126. http.Error(w, err.Error(), http.StatusBadRequest)
  127. }
  128. io.WriteString(w, req.Name)
  129. }).Methods(http.MethodPost)
  130. router.HandleFunc("/bookshelf/v1/shelves", func(w http.ResponseWriter, r *http.Request) {
  131. req := &ShelfMessage{}
  132. err := json.NewDecoder(r.Body).Decode(req)
  133. if err != nil {
  134. http.Error(w, err.Error(), http.StatusBadRequest)
  135. }
  136. if req.Id == "" || req.Theme == "" {
  137. http.Error(w, "empty request", http.StatusBadRequest)
  138. }
  139. idint, _ := strconv.Atoi(req.Id)
  140. out := ShelfMessageOut{Id: int64(idint), Theme: req.Theme}
  141. jsonOut(w, out)
  142. }).Methods(http.MethodPost)
  143. router.HandleFunc("/bookshelf/v1/shelves/{shelf}/books/{book}", func(w http.ResponseWriter, r *http.Request) {
  144. defer r.Body.Close()
  145. vars := mux.Vars(r)
  146. shelf, book := vars["shelf"], vars["book"]
  147. if shelf == "" || book == "" {
  148. http.Error(w, "empty request", http.StatusBadRequest)
  149. }
  150. idint, _ := strconv.Atoi(book)
  151. out := BookMessage{Id: int64(idint), Author: "NA", Title: "title_" + book}
  152. jsonOut(w, out)
  153. }).Methods(http.MethodGet)
  154. router.HandleFunc("/messaging/v1/messages/{name}", func(w http.ResponseWriter, r *http.Request) {
  155. defer r.Body.Close()
  156. vars := mux.Vars(r)
  157. name := vars["name"]
  158. if name == "" {
  159. http.Error(w, "empty request", http.StatusBadRequest)
  160. }
  161. out := MessageMessage{Text: name + " content"}
  162. jsonOut(w, out)
  163. }).Methods(http.MethodGet)
  164. router.HandleFunc("/messaging/v1/messages/filter/{name}", func(w http.ResponseWriter, r *http.Request) {
  165. defer r.Body.Close()
  166. vars := mux.Vars(r)
  167. name := vars["name"]
  168. q := r.URL.Query()
  169. rev, sub := q.Get("revision"), q.Get("sub.subfield")
  170. if name == "" || rev == "" || sub == "" {
  171. http.Error(w, "empty request", http.StatusBadRequest)
  172. }
  173. out := MessageMessage{Text: name + rev + sub}
  174. jsonOut(w, out)
  175. }).Methods(http.MethodGet)
  176. router.HandleFunc("/messaging/v1/messages/{name}", func(w http.ResponseWriter, r *http.Request) {
  177. defer r.Body.Close()
  178. vars := mux.Vars(r)
  179. name := vars["name"]
  180. if name == "" {
  181. http.Error(w, "empty request", http.StatusBadRequest)
  182. }
  183. body := &MessageMessage{}
  184. err := json.NewDecoder(r.Body).Decode(body)
  185. if err != nil {
  186. http.Error(w, err.Error(), http.StatusBadRequest)
  187. }
  188. out := MessageMessage{Text: body.Text}
  189. jsonOut(w, out)
  190. }).Methods(http.MethodPut, http.MethodPatch)
  191. server := httptest.NewUnstartedServer(router)
  192. server.Listener.Close()
  193. server.Listener = l
  194. // Start the server.
  195. server.Start()
  196. defer server.Close()
  197. // Reset
  198. streamList := []string{"helloStr", "commands", "fakeBin", "shelves", "demo", "mes", "optional_commands"}
  199. topotest.HandleStream(false, streamList, t)
  200. // Data setup
  201. tests := []topotest.RuleTest{
  202. {
  203. Name: `TestRestRule1`,
  204. Sql: `SELECT helloFromRest(name) as wc FROM helloStr`,
  205. R: [][]map[string]interface{}{
  206. {{
  207. "wc": map[string]interface{}{
  208. "message": "world",
  209. },
  210. }},
  211. {{
  212. "wc": map[string]interface{}{
  213. "message": "golang",
  214. },
  215. }},
  216. {{
  217. "wc": map[string]interface{}{
  218. "message": "peacock",
  219. },
  220. }},
  221. },
  222. M: map[string]interface{}{
  223. "op_2_project_0_exceptions_total": int64(0),
  224. "op_2_project_0_process_latency_us": int64(0),
  225. "op_2_project_0_records_in_total": int64(3),
  226. "op_2_project_0_records_out_total": int64(3),
  227. "sink_mockSink_0_exceptions_total": int64(0),
  228. "sink_mockSink_0_records_in_total": int64(3),
  229. "sink_mockSink_0_records_out_total": int64(3),
  230. },
  231. }, {
  232. Name: `TestRestRule2`,
  233. Sql: `SELECT objectDetectFromRest(cmd, base64_img)->result FROM commands`,
  234. R: [][]map[string]interface{}{
  235. {{
  236. "kuiper_field_0": "get success",
  237. }},
  238. {{
  239. "kuiper_field_0": "detect success",
  240. }},
  241. {{
  242. "kuiper_field_0": "delete success",
  243. }},
  244. },
  245. M: map[string]interface{}{
  246. "op_2_project_0_exceptions_total": int64(0),
  247. "op_2_project_0_process_latency_us": int64(0),
  248. "op_2_project_0_records_in_total": int64(3),
  249. "op_2_project_0_records_out_total": int64(3),
  250. "sink_mockSink_0_exceptions_total": int64(0),
  251. "sink_mockSink_0_records_in_total": int64(3),
  252. "sink_mockSink_0_records_out_total": int64(3),
  253. },
  254. }, {
  255. Name: `TestRestRule3`,
  256. Sql: `SELECT objectDetectFromRest(*)->result FROM commands`,
  257. R: [][]map[string]interface{}{
  258. {{
  259. "kuiper_field_0": "get success",
  260. }},
  261. {{
  262. "kuiper_field_0": "detect success",
  263. }},
  264. {{
  265. "kuiper_field_0": "delete success",
  266. }},
  267. },
  268. M: map[string]interface{}{
  269. "op_2_project_0_exceptions_total": int64(0),
  270. "op_2_project_0_process_latency_us": int64(0),
  271. "op_2_project_0_records_in_total": int64(3),
  272. "op_2_project_0_records_out_total": int64(3),
  273. "sink_mockSink_0_exceptions_total": int64(0),
  274. "sink_mockSink_0_records_in_total": int64(3),
  275. "sink_mockSink_0_records_out_total": int64(3),
  276. },
  277. //}, {
  278. // Name: `TestRestRule3`,
  279. // Sql: `SELECT getFeatureFromRest(self)->feature[0]->box->h FROM fakeBin`,
  280. // R: [][]map[string]interface{}{
  281. // {{
  282. // "kuiper_field_0": 106,
  283. // }},
  284. // {{
  285. // "kuiper_field_0": 107,
  286. // }},
  287. // {{
  288. // "kuiper_field_0": 108,
  289. // }},
  290. // },
  291. // M: map[string]interface{}{
  292. // "op_2_project_0_exceptions_total": int64(0),
  293. // "op_2_project_0_process_latency_us": int64(0),
  294. // "op_2_project_0_records_in_total": int64(3),
  295. // "op_2_project_0_records_out_total": int64(3),
  296. //
  297. // "sink_mockSink_0_exceptions_total": int64(0),
  298. // "sink_mockSink_0_records_in_total": int64(3),
  299. // "sink_mockSink_0_records_out_total": int64(3),
  300. // },
  301. }, {
  302. Name: `TestRestRule4`,
  303. Sql: `SELECT getStatusFromRest(), cmd FROM commands`,
  304. R: [][]map[string]interface{}{
  305. {{
  306. "getStatusFromRest": true,
  307. "cmd": "get",
  308. }},
  309. {{
  310. "getStatusFromRest": false,
  311. "cmd": "detect",
  312. }},
  313. {{
  314. "getStatusFromRest": true,
  315. "cmd": "delete",
  316. }},
  317. },
  318. M: map[string]interface{}{
  319. "op_2_project_0_exceptions_total": int64(0),
  320. "op_2_project_0_process_latency_us": int64(0),
  321. "op_2_project_0_records_in_total": int64(3),
  322. "op_2_project_0_records_out_total": int64(3),
  323. "sink_mockSink_0_exceptions_total": int64(0),
  324. "sink_mockSink_0_records_in_total": int64(3),
  325. "sink_mockSink_0_records_out_total": int64(3),
  326. },
  327. }, {
  328. Name: `TestRestRule5`,
  329. Sql: `SELECT restEncodedJson(encoded_json) as name FROM commands`,
  330. R: [][]map[string]interface{}{
  331. {{
  332. "name": "name1",
  333. }},
  334. {{
  335. "name": "name2",
  336. }},
  337. {{
  338. "name": "name3",
  339. }},
  340. },
  341. M: map[string]interface{}{
  342. "op_2_project_0_exceptions_total": int64(0),
  343. "op_2_project_0_process_latency_us": int64(0),
  344. "op_2_project_0_records_in_total": int64(3),
  345. "op_2_project_0_records_out_total": int64(3),
  346. "sink_mockSink_0_exceptions_total": int64(0),
  347. "sink_mockSink_0_records_in_total": int64(3),
  348. "sink_mockSink_0_records_out_total": int64(3),
  349. },
  350. }, {
  351. Name: `TestRestRule6`,
  352. Sql: `SELECT CreateShelf(shelf)->theme as theme FROM shelves`,
  353. R: [][]map[string]interface{}{
  354. {{
  355. "theme": "tandra",
  356. }},
  357. {{
  358. "theme": "claro",
  359. }},
  360. {{
  361. "theme": "dark",
  362. }},
  363. },
  364. M: map[string]interface{}{
  365. "op_2_project_0_exceptions_total": int64(0),
  366. "op_2_project_0_process_latency_us": int64(0),
  367. "op_2_project_0_records_in_total": int64(3),
  368. "op_2_project_0_records_out_total": int64(3),
  369. "sink_mockSink_0_exceptions_total": int64(0),
  370. "sink_mockSink_0_records_in_total": int64(3),
  371. "sink_mockSink_0_records_out_total": int64(3),
  372. },
  373. }, {
  374. Name: `TestRestRule7`,
  375. Sql: `SELECT GetBook(size, ts)->title as title FROM demo WHERE size > 3 `,
  376. R: [][]map[string]interface{}{
  377. {{
  378. "title": "title_1541152486822",
  379. }},
  380. {{
  381. "title": "title_1541152488442",
  382. }},
  383. },
  384. M: map[string]interface{}{
  385. "op_2_filter_0_exceptions_total": int64(0),
  386. "op_2_filter_0_process_latency_us": int64(0),
  387. "op_2_filter_0_records_in_total": int64(5),
  388. "op_2_filter_0_records_out_total": int64(2),
  389. "sink_mockSink_0_exceptions_total": int64(0),
  390. "sink_mockSink_0_records_in_total": int64(2),
  391. "sink_mockSink_0_records_out_total": int64(2),
  392. },
  393. }, {
  394. Name: `TestRestRule8`,
  395. Sql: `SELECT GetMessage(concat("messages/",ts))->text as message FROM demo WHERE size > 3`,
  396. R: [][]map[string]interface{}{
  397. {{
  398. "message": "1541152486822 content",
  399. }},
  400. {{
  401. "message": "1541152488442 content",
  402. }},
  403. },
  404. M: map[string]interface{}{
  405. "op_2_filter_0_exceptions_total": int64(0),
  406. "op_2_filter_0_process_latency_us": int64(0),
  407. "op_2_filter_0_records_in_total": int64(5),
  408. "op_2_filter_0_records_out_total": int64(2),
  409. "sink_mockSink_0_exceptions_total": int64(0),
  410. "sink_mockSink_0_records_in_total": int64(2),
  411. "sink_mockSink_0_records_out_total": int64(2),
  412. },
  413. }, {
  414. Name: `TestRestRule9`,
  415. Sql: `SELECT SearchMessage(name, size, shelf)->text as message FROM shelves`,
  416. R: [][]map[string]interface{}{
  417. {{
  418. "message": "name12sub1",
  419. }},
  420. {{
  421. "message": "name23sub2",
  422. }},
  423. {{
  424. "message": "name34sub3",
  425. }},
  426. },
  427. M: map[string]interface{}{
  428. "op_2_project_0_exceptions_total": int64(0),
  429. "op_2_project_0_process_latency_us": int64(0),
  430. "op_2_project_0_records_in_total": int64(3),
  431. "op_2_project_0_records_out_total": int64(3),
  432. "sink_mockSink_0_exceptions_total": int64(0),
  433. "sink_mockSink_0_records_in_total": int64(3),
  434. "sink_mockSink_0_records_out_total": int64(3),
  435. },
  436. // TODO support * as one of the parameters
  437. //},{
  438. // Name: `TestRestRule10`,
  439. // Sql: `SELECT UpdateMessage(message_id, *)->text as message FROM mes`,
  440. // R: [][]map[string]interface{}{
  441. // {{
  442. // "message": "message1",
  443. // }},
  444. // {{
  445. // "message": "message2",
  446. // }},
  447. // {{
  448. // "message": "message3",
  449. // }},
  450. // },
  451. // M: map[string]interface{}{
  452. // "op_2_project_0_exceptions_total": int64(0),
  453. // "op_2_project_0_process_latency_us": int64(0),
  454. // "op_2_project_0_records_in_total": int64(3),
  455. // "op_2_project_0_records_out_total": int64(3),
  456. //
  457. // "sink_mockSink_0_exceptions_total": int64(0),
  458. // "sink_mockSink_0_records_in_total": int64(3),
  459. // "sink_mockSink_0_records_out_total": int64(3),
  460. // },
  461. }, {
  462. Name: `TestRestRule11`,
  463. Sql: `SELECT PatchMessage(message_id, text)->text as message FROM mes`,
  464. R: [][]map[string]interface{}{
  465. {{
  466. "message": "message1",
  467. }},
  468. {{
  469. "message": "message2",
  470. }},
  471. {{
  472. "message": "message3",
  473. }},
  474. },
  475. M: map[string]interface{}{
  476. "op_2_project_0_exceptions_total": int64(0),
  477. "op_2_project_0_process_latency_us": int64(0),
  478. "op_2_project_0_records_in_total": int64(3),
  479. "op_2_project_0_records_out_total": int64(3),
  480. "sink_mockSink_0_exceptions_total": int64(0),
  481. "sink_mockSink_0_records_in_total": int64(3),
  482. "sink_mockSink_0_records_out_total": int64(3),
  483. },
  484. }, {
  485. Name: `TestRestRule12`,
  486. Sql: `SELECT objectDetectFromRest(*) AS res FROM optional_commands`,
  487. R: [][]map[string]interface{}{
  488. {{
  489. "res": map[string]interface{}{
  490. "image": "my image1",
  491. "result": " success",
  492. "type": "S",
  493. },
  494. }},
  495. {{
  496. "res": map[string]interface{}{
  497. "image": "my image2",
  498. "result": " success",
  499. "type": "S",
  500. },
  501. }},
  502. {{
  503. "res": map[string]interface{}{
  504. "image": "my image3",
  505. "result": " success",
  506. "type": "S",
  507. },
  508. }},
  509. },
  510. M: map[string]interface{}{
  511. "op_2_project_0_exceptions_total": int64(0),
  512. "op_2_project_0_process_latency_us": int64(0),
  513. "op_2_project_0_records_in_total": int64(3),
  514. "op_2_project_0_records_out_total": int64(3),
  515. "sink_mockSink_0_exceptions_total": int64(0),
  516. "sink_mockSink_0_records_in_total": int64(3),
  517. "sink_mockSink_0_records_out_total": int64(3),
  518. },
  519. },
  520. }
  521. topotest.HandleStream(true, streamList, t)
  522. topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
  523. BufferLength: 100,
  524. SendError: true,
  525. }, 0)
  526. }
  527. func jsonOut(w http.ResponseWriter, out interface{}) {
  528. w.Header().Add("Content-Type", "application/json")
  529. enc := json.NewEncoder(w)
  530. err := enc.Encode(out)
  531. // Problems encoding
  532. if err != nil {
  533. http.Error(w, err.Error(), http.StatusBadRequest)
  534. }
  535. }
  536. type Resolver map[string]reflect.Value
  537. func (r Resolver) Resolve(name string, _ []reflect.Value) (reflect.Value, error) {
  538. return r[name], nil
  539. }
  540. func SayHello(name string) map[string]interface{} {
  541. return map[string]interface{}{
  542. "message": name,
  543. }
  544. }
  545. func get_feature(img []byte) map[string]interface{} {
  546. l := len(string(img))
  547. return map[string]interface{}{
  548. "feature": []map[string]interface{}{
  549. {
  550. "features": []float64{-1.444, 2.55452, 5.121},
  551. "box": map[string]interface{}{
  552. "x": 153,
  553. "y": 107,
  554. "w": 174,
  555. "h": 100 + l,
  556. },
  557. }, {
  558. "features": []float64{1.444, -2.55452, -5.121},
  559. "box": map[string]interface{}{
  560. "x": 257,
  561. "y": 92,
  562. "w": 169,
  563. "h": 208,
  564. },
  565. },
  566. },
  567. }
  568. }
  569. func object_detection(command string, image string) map[string]interface{} {
  570. out := map[string]interface{}{
  571. "info": command,
  572. "code": 200,
  573. "image": image,
  574. "result": command + " success",
  575. "type": "S",
  576. }
  577. return out
  578. }
  579. func getStatus() bool {
  580. return true
  581. }
  582. type server struct {
  583. UnimplementedGreeterServer
  584. }
  585. func (s *server) SayHello(_ context.Context, in *HelloRequest) (*HelloReply, error) {
  586. return &HelloReply{Message: in.GetName()}, nil
  587. }
  588. func (s *server) ObjectDetection(_ context.Context, in *ObjectDetectionRequest) (*ObjectDetectionResponse, error) {
  589. return &ObjectDetectionResponse{
  590. Info: in.Cmd,
  591. Code: 200,
  592. Image: in.Base64Img,
  593. Result: in.Cmd + " success",
  594. Type: "S",
  595. }, nil
  596. }
  597. func (s *server) GetFeature(_ context.Context, v *wrappers.BytesValue) (*FeatureResponse, error) {
  598. l := len(string(v.Value))
  599. return &FeatureResponse{
  600. Feature: []*FeatureResult{
  601. {
  602. Features: []float32{-1.444, 2.55452, 5.121},
  603. Box: &Box{
  604. X: 153,
  605. Y: 107,
  606. W: 174,
  607. H: int32(100 + l),
  608. },
  609. },
  610. {
  611. Features: []float32{1.444, -2.55452, -5.121},
  612. Box: &Box{
  613. X: 257,
  614. Y: 92,
  615. W: 169,
  616. H: 208,
  617. },
  618. },
  619. },
  620. }, nil
  621. }
  622. func (s *server) GetStatus(context.Context, *empty.Empty) (*wrappers.BoolValue, error) {
  623. return &wrappers.BoolValue{Value: true}, nil
  624. }
  625. func TestGrpcService(t *testing.T) {
  626. lis, err := net.Listen("tcp", ":50051")
  627. if err != nil {
  628. kconf.Log.Fatalf("failed to listen: %v", err)
  629. }
  630. s := grpc.NewServer()
  631. RegisterGreeterServer(s, &server{})
  632. go func() {
  633. if err := s.Serve(lis); err != nil {
  634. kconf.Log.Fatalf("failed to serve: %v", err)
  635. }
  636. }()
  637. defer s.Stop()
  638. // Reset
  639. streamList := []string{"helloStr", "commands", "fakeBin"}
  640. topotest.HandleStream(false, streamList, t)
  641. // Data setup
  642. tests := []topotest.RuleTest{
  643. {
  644. Name: `TestRestRule1`,
  645. Sql: `SELECT helloFromGrpc(name) as wc FROM helloStr`,
  646. R: [][]map[string]interface{}{
  647. {{
  648. "wc": map[string]interface{}{
  649. "message": "world",
  650. },
  651. }},
  652. {{
  653. "wc": map[string]interface{}{
  654. "message": "golang",
  655. },
  656. }},
  657. {{
  658. "wc": map[string]interface{}{
  659. "message": "peacock",
  660. },
  661. }},
  662. },
  663. M: map[string]interface{}{
  664. "op_2_project_0_exceptions_total": int64(0),
  665. "op_2_project_0_process_latency_us": int64(0),
  666. "op_2_project_0_records_in_total": int64(3),
  667. "op_2_project_0_records_out_total": int64(3),
  668. "sink_mockSink_0_exceptions_total": int64(0),
  669. "sink_mockSink_0_records_in_total": int64(3),
  670. "sink_mockSink_0_records_out_total": int64(3),
  671. },
  672. }, {
  673. Name: `TestRestRule2`,
  674. Sql: `SELECT objectDetectFromGrpc(cmd, base64_img)->result FROM commands`,
  675. R: [][]map[string]interface{}{
  676. {{
  677. "kuiper_field_0": "get success",
  678. }},
  679. {{
  680. "kuiper_field_0": "detect success",
  681. }},
  682. {{
  683. "kuiper_field_0": "delete success",
  684. }},
  685. },
  686. M: map[string]interface{}{
  687. "op_2_project_0_exceptions_total": int64(0),
  688. "op_2_project_0_process_latency_us": int64(0),
  689. "op_2_project_0_records_in_total": int64(3),
  690. "op_2_project_0_records_out_total": int64(3),
  691. "sink_mockSink_0_exceptions_total": int64(0),
  692. "sink_mockSink_0_records_in_total": int64(3),
  693. "sink_mockSink_0_records_out_total": int64(3),
  694. },
  695. }, {
  696. Name: `TestRestRule3`,
  697. Sql: `SELECT getFeatureFromGrpc(self)->feature[0]->box->h FROM fakeBin`,
  698. R: [][]map[string]interface{}{
  699. {{
  700. "kuiper_field_0": float64(106), // Convert by the testing tool
  701. }},
  702. {{
  703. "kuiper_field_0": float64(107),
  704. }},
  705. {{
  706. "kuiper_field_0": float64(108),
  707. }},
  708. },
  709. M: map[string]interface{}{
  710. "op_2_project_0_exceptions_total": int64(0),
  711. "op_2_project_0_process_latency_us": int64(0),
  712. "op_2_project_0_records_in_total": int64(3),
  713. "op_2_project_0_records_out_total": int64(3),
  714. "sink_mockSink_0_exceptions_total": int64(0),
  715. "sink_mockSink_0_records_in_total": int64(3),
  716. "sink_mockSink_0_records_out_total": int64(3),
  717. },
  718. }, {
  719. Name: `TestRestRule4`,
  720. Sql: `SELECT getStatusFromGrpc(), cmd FROM commands`,
  721. R: [][]map[string]interface{}{
  722. {{
  723. "getStatusFromGrpc": true,
  724. "cmd": "get",
  725. }},
  726. {{
  727. "getStatusFromGrpc": true,
  728. "cmd": "detect",
  729. }},
  730. {{
  731. "getStatusFromGrpc": true,
  732. "cmd": "delete",
  733. }},
  734. },
  735. M: map[string]interface{}{
  736. "op_2_project_0_exceptions_total": int64(0),
  737. "op_2_project_0_process_latency_us": int64(0),
  738. "op_2_project_0_records_in_total": int64(3),
  739. "op_2_project_0_records_out_total": int64(3),
  740. "sink_mockSink_0_exceptions_total": int64(0),
  741. "sink_mockSink_0_records_in_total": int64(3),
  742. "sink_mockSink_0_records_out_total": int64(3),
  743. },
  744. }, {
  745. Name: `TestRestRule5`,
  746. Sql: `SELECT objectDetectFromGrpc(*) -> image AS res FROM optional_commands`,
  747. R: [][]map[string]interface{}{
  748. {{
  749. "res": "my image1",
  750. }},
  751. {{
  752. "res": "my image2",
  753. }},
  754. {{
  755. "res": "my image3",
  756. }},
  757. },
  758. M: map[string]interface{}{
  759. "op_2_project_0_exceptions_total": int64(0),
  760. "op_2_project_0_process_latency_us": int64(0),
  761. "op_2_project_0_records_in_total": int64(3),
  762. "op_2_project_0_records_out_total": int64(3),
  763. "sink_mockSink_0_exceptions_total": int64(0),
  764. "sink_mockSink_0_records_in_total": int64(3),
  765. "sink_mockSink_0_records_out_total": int64(3),
  766. },
  767. },
  768. }
  769. topotest.HandleStream(true, streamList, t)
  770. topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
  771. BufferLength: 100,
  772. SendError: true,
  773. }, 0)
  774. }