external_service_rule_test.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. "github.com/golang/protobuf/ptypes/empty"
  20. "github.com/golang/protobuf/ptypes/wrappers"
  21. "github.com/gorilla/mux"
  22. kconf "github.com/lf-edge/ekuiper/internal/conf"
  23. "github.com/lf-edge/ekuiper/internal/topo/topotest"
  24. "github.com/lf-edge/ekuiper/pkg/api"
  25. "github.com/msgpack-rpc/msgpack-rpc-go/rpc"
  26. "google.golang.org/grpc"
  27. "io"
  28. "net"
  29. "net/http"
  30. "net/http/httptest"
  31. "reflect"
  32. "strconv"
  33. "testing"
  34. )
  35. type RestHelloRequest struct {
  36. Name string `json:"name,omitempty"`
  37. }
  38. type RestHelloReply struct {
  39. Message string `json:"message,omitempty"`
  40. }
  41. type ObjectDetectRequest struct {
  42. Command string `json:"cmd,omitempty"`
  43. Image string `json:"base64_img,omitempty"`
  44. }
  45. type ObjectDetectResponse struct {
  46. Info string `json:"cmd,omitempty"`
  47. Code int `json:"base64_img,omitempty"`
  48. Image string `json:"image,omitempty"`
  49. Result string `json:"result,omitempty"`
  50. Type string `json:"type,omitempty"`
  51. }
  52. //type Box struct {
  53. // X int32 `json:"x,omitempty"`
  54. // Y int32 `json:"y,omitempty"`
  55. // W int32 `json:"w,omitempty"`
  56. // H int32 `json:"h,omitempty"`
  57. //}
  58. //type FeatureResult struct {
  59. // Features []float64 `json:"features,omitempty"`
  60. // Box Box `json:"box,omitempty"`
  61. //}
  62. type EncodedRequest struct {
  63. Name string `json:"name,omitempty"`
  64. Size int `json:"size,omitempty"`
  65. }
  66. type ShelfMessage struct {
  67. Id string `json:"id,omitempty"`
  68. Theme string `json:"theme,omitempty"`
  69. }
  70. type ShelfMessageOut struct {
  71. Id int64 `json:"id,omitempty"`
  72. Theme string `json:"theme,omitempty"`
  73. }
  74. type BookMessage struct {
  75. Id int64 `json:"id,omitempty"`
  76. Author string `json:"author,omitempty"`
  77. Title string `json:"title,omitempty"`
  78. }
  79. type MessageMessage struct {
  80. Text string `json:"text,omitempty"`
  81. }
  82. func TestRestService(t *testing.T) {
  83. // mock server, the port is set in the sample.json
  84. l, err := net.Listen("tcp", "127.0.0.1:51234")
  85. if err != nil {
  86. t.Error(err)
  87. t.FailNow()
  88. }
  89. count := 0
  90. router := mux.NewRouter()
  91. router.HandleFunc("/SayHello", func(w http.ResponseWriter, r *http.Request) {
  92. body := &RestHelloRequest{}
  93. err := json.NewDecoder(r.Body).Decode(body)
  94. if err != nil {
  95. http.Error(w, err.Error(), http.StatusBadRequest)
  96. }
  97. out := &RestHelloReply{Message: body.Name}
  98. jsonOut(w, err, out)
  99. }).Methods(http.MethodPost)
  100. router.HandleFunc("/object_detection", func(w http.ResponseWriter, r *http.Request) {
  101. req := &ObjectDetectRequest{}
  102. err := json.NewDecoder(r.Body).Decode(req)
  103. if err != nil {
  104. http.Error(w, err.Error(), http.StatusBadRequest)
  105. }
  106. if req.Image == "" {
  107. http.Error(w, "image is not found", http.StatusBadRequest)
  108. }
  109. out := &ObjectDetectResponse{
  110. Info: req.Command,
  111. Code: 200,
  112. Image: req.Image,
  113. Result: req.Command + " success",
  114. Type: "S",
  115. }
  116. jsonOut(w, err, out)
  117. }).Methods(http.MethodPost)
  118. router.HandleFunc("/getStatus", func(w http.ResponseWriter, r *http.Request) {
  119. result := count%2 == 0
  120. count++
  121. io.WriteString(w, fmt.Sprintf("%v", result))
  122. }).Methods(http.MethodPost)
  123. router.HandleFunc("/RestEncodedJson", func(w http.ResponseWriter, r *http.Request) {
  124. req := &EncodedRequest{}
  125. err := json.NewDecoder(r.Body).Decode(req)
  126. if err != nil {
  127. http.Error(w, err.Error(), http.StatusBadRequest)
  128. }
  129. io.WriteString(w, req.Name)
  130. }).Methods(http.MethodPost)
  131. router.HandleFunc("/bookshelf/v1/shelves", func(w http.ResponseWriter, r *http.Request) {
  132. req := &ShelfMessage{}
  133. err := json.NewDecoder(r.Body).Decode(req)
  134. if err != nil {
  135. http.Error(w, err.Error(), http.StatusBadRequest)
  136. }
  137. if req.Id == "" || req.Theme == "" {
  138. http.Error(w, "empty request", http.StatusBadRequest)
  139. }
  140. idint, _ := strconv.Atoi(req.Id)
  141. out := ShelfMessageOut{Id: int64(idint), Theme: req.Theme}
  142. jsonOut(w, err, out)
  143. }).Methods(http.MethodPost)
  144. router.HandleFunc("/bookshelf/v1/shelves/{shelf}/books/{book}", func(w http.ResponseWriter, r *http.Request) {
  145. defer r.Body.Close()
  146. vars := mux.Vars(r)
  147. shelf, book := vars["shelf"], vars["book"]
  148. if shelf == "" || book == "" {
  149. http.Error(w, "empty request", http.StatusBadRequest)
  150. }
  151. idint, _ := strconv.Atoi(book)
  152. out := BookMessage{Id: int64(idint), Author: "NA", Title: "title_" + book}
  153. jsonOut(w, err, out)
  154. }).Methods(http.MethodGet)
  155. router.HandleFunc("/messaging/v1/messages/{name}", func(w http.ResponseWriter, r *http.Request) {
  156. defer r.Body.Close()
  157. vars := mux.Vars(r)
  158. name := vars["name"]
  159. if name == "" {
  160. http.Error(w, "empty request", http.StatusBadRequest)
  161. }
  162. out := MessageMessage{Text: name + " content"}
  163. jsonOut(w, err, out)
  164. }).Methods(http.MethodGet)
  165. router.HandleFunc("/messaging/v1/messages/filter/{name}", func(w http.ResponseWriter, r *http.Request) {
  166. defer r.Body.Close()
  167. vars := mux.Vars(r)
  168. name := vars["name"]
  169. q := r.URL.Query()
  170. rev, sub := q.Get("revision"), q.Get("sub.subfield")
  171. if name == "" || rev == "" || sub == "" {
  172. http.Error(w, "empty request", http.StatusBadRequest)
  173. }
  174. out := MessageMessage{Text: name + rev + sub}
  175. jsonOut(w, err, out)
  176. }).Methods(http.MethodGet)
  177. router.HandleFunc("/messaging/v1/messages/{name}", func(w http.ResponseWriter, r *http.Request) {
  178. defer r.Body.Close()
  179. vars := mux.Vars(r)
  180. name := vars["name"]
  181. if name == "" {
  182. http.Error(w, "empty request", http.StatusBadRequest)
  183. }
  184. body := &MessageMessage{}
  185. err := json.NewDecoder(r.Body).Decode(body)
  186. if err != nil {
  187. http.Error(w, err.Error(), http.StatusBadRequest)
  188. }
  189. out := MessageMessage{Text: body.Text}
  190. jsonOut(w, err, out)
  191. }).Methods(http.MethodPut, http.MethodPatch)
  192. server := httptest.NewUnstartedServer(router)
  193. server.Listener.Close()
  194. server.Listener = l
  195. // Start the server.
  196. server.Start()
  197. defer server.Close()
  198. //Reset
  199. streamList := []string{"helloStr", "commands", "fakeBin", "shelves", "demo", "mes"}
  200. topotest.HandleStream(false, streamList, t)
  201. //Data setup
  202. var tests = []topotest.RuleTest{
  203. {
  204. Name: `TestRestRule1`,
  205. Sql: `SELECT helloFromRest(name) as wc FROM helloStr`,
  206. R: [][]map[string]interface{}{
  207. {{
  208. "wc": map[string]interface{}{
  209. "message": "world",
  210. },
  211. }},
  212. {{
  213. "wc": map[string]interface{}{
  214. "message": "golang",
  215. },
  216. }},
  217. {{
  218. "wc": map[string]interface{}{
  219. "message": "peacock",
  220. },
  221. }},
  222. },
  223. M: map[string]interface{}{
  224. "op_2_project_0_exceptions_total": int64(0),
  225. "op_2_project_0_process_latency_us": int64(0),
  226. "op_2_project_0_records_in_total": int64(3),
  227. "op_2_project_0_records_out_total": int64(3),
  228. "sink_mockSink_0_exceptions_total": int64(0),
  229. "sink_mockSink_0_records_in_total": int64(3),
  230. "sink_mockSink_0_records_out_total": int64(3),
  231. },
  232. }, {
  233. Name: `TestRestRule2`,
  234. Sql: `SELECT objectDetectFromRest(cmd, base64_img)->result FROM commands`,
  235. R: [][]map[string]interface{}{
  236. {{
  237. "kuiper_field_0": "get success",
  238. }},
  239. {{
  240. "kuiper_field_0": "detect success",
  241. }},
  242. {{
  243. "kuiper_field_0": "delete success",
  244. }},
  245. },
  246. M: map[string]interface{}{
  247. "op_2_project_0_exceptions_total": int64(0),
  248. "op_2_project_0_process_latency_us": int64(0),
  249. "op_2_project_0_records_in_total": int64(3),
  250. "op_2_project_0_records_out_total": int64(3),
  251. "sink_mockSink_0_exceptions_total": int64(0),
  252. "sink_mockSink_0_records_in_total": int64(3),
  253. "sink_mockSink_0_records_out_total": int64(3),
  254. },
  255. }, {
  256. Name: `TestRestRule3`,
  257. Sql: `SELECT objectDetectFromRest(*)->result FROM commands`,
  258. R: [][]map[string]interface{}{
  259. {{
  260. "kuiper_field_0": "get success",
  261. }},
  262. {{
  263. "kuiper_field_0": "detect success",
  264. }},
  265. {{
  266. "kuiper_field_0": "delete success",
  267. }},
  268. },
  269. M: map[string]interface{}{
  270. "op_2_project_0_exceptions_total": int64(0),
  271. "op_2_project_0_process_latency_us": int64(0),
  272. "op_2_project_0_records_in_total": int64(3),
  273. "op_2_project_0_records_out_total": int64(3),
  274. "sink_mockSink_0_exceptions_total": int64(0),
  275. "sink_mockSink_0_records_in_total": int64(3),
  276. "sink_mockSink_0_records_out_total": int64(3),
  277. },
  278. //}, {
  279. // Name: `TestRestRule3`,
  280. // Sql: `SELECT getFeatureFromRest(self)->feature[0]->box->h FROM fakeBin`,
  281. // R: [][]map[string]interface{}{
  282. // {{
  283. // "kuiper_field_0": 106,
  284. // }},
  285. // {{
  286. // "kuiper_field_0": 107,
  287. // }},
  288. // {{
  289. // "kuiper_field_0": 108,
  290. // }},
  291. // },
  292. // M: map[string]interface{}{
  293. // "op_2_project_0_exceptions_total": int64(0),
  294. // "op_2_project_0_process_latency_us": int64(0),
  295. // "op_2_project_0_records_in_total": int64(3),
  296. // "op_2_project_0_records_out_total": int64(3),
  297. //
  298. // "sink_mockSink_0_exceptions_total": int64(0),
  299. // "sink_mockSink_0_records_in_total": int64(3),
  300. // "sink_mockSink_0_records_out_total": int64(3),
  301. // },
  302. }, {
  303. Name: `TestRestRule4`,
  304. Sql: `SELECT getStatusFromRest(), cmd FROM commands`,
  305. R: [][]map[string]interface{}{
  306. {{
  307. "getStatusFromRest": true,
  308. "cmd": "get",
  309. }},
  310. {{
  311. "getStatusFromRest": false,
  312. "cmd": "detect",
  313. }},
  314. {{
  315. "getStatusFromRest": true,
  316. "cmd": "delete",
  317. }},
  318. },
  319. M: map[string]interface{}{
  320. "op_2_project_0_exceptions_total": int64(0),
  321. "op_2_project_0_process_latency_us": int64(0),
  322. "op_2_project_0_records_in_total": int64(3),
  323. "op_2_project_0_records_out_total": int64(3),
  324. "sink_mockSink_0_exceptions_total": int64(0),
  325. "sink_mockSink_0_records_in_total": int64(3),
  326. "sink_mockSink_0_records_out_total": int64(3),
  327. },
  328. }, {
  329. Name: `TestRestRule5`,
  330. Sql: `SELECT restEncodedJson(encoded_json) as name FROM commands`,
  331. R: [][]map[string]interface{}{
  332. {{
  333. "name": "name1",
  334. }},
  335. {{
  336. "name": "name2",
  337. }},
  338. {{
  339. "name": "name3",
  340. }},
  341. },
  342. M: map[string]interface{}{
  343. "op_2_project_0_exceptions_total": int64(0),
  344. "op_2_project_0_process_latency_us": int64(0),
  345. "op_2_project_0_records_in_total": int64(3),
  346. "op_2_project_0_records_out_total": int64(3),
  347. "sink_mockSink_0_exceptions_total": int64(0),
  348. "sink_mockSink_0_records_in_total": int64(3),
  349. "sink_mockSink_0_records_out_total": int64(3),
  350. },
  351. }, {
  352. Name: `TestRestRule6`,
  353. Sql: `SELECT CreateShelf(shelf)->theme as theme FROM shelves`,
  354. R: [][]map[string]interface{}{
  355. {{
  356. "theme": "tandra",
  357. }},
  358. {{
  359. "theme": "claro",
  360. }},
  361. {{
  362. "theme": "dark",
  363. }},
  364. },
  365. M: map[string]interface{}{
  366. "op_2_project_0_exceptions_total": int64(0),
  367. "op_2_project_0_process_latency_us": int64(0),
  368. "op_2_project_0_records_in_total": int64(3),
  369. "op_2_project_0_records_out_total": int64(3),
  370. "sink_mockSink_0_exceptions_total": int64(0),
  371. "sink_mockSink_0_records_in_total": int64(3),
  372. "sink_mockSink_0_records_out_total": int64(3),
  373. },
  374. }, {
  375. Name: `TestRestRule7`,
  376. Sql: `SELECT GetBook(size, ts)->title as title FROM demo WHERE size > 3 `,
  377. R: [][]map[string]interface{}{
  378. {{
  379. "title": "title_1541152486822",
  380. }},
  381. {{
  382. "title": "title_1541152488442",
  383. }},
  384. },
  385. M: map[string]interface{}{
  386. "op_2_filter_0_exceptions_total": int64(0),
  387. "op_2_filter_0_process_latency_us": int64(0),
  388. "op_2_filter_0_records_in_total": int64(5),
  389. "op_2_filter_0_records_out_total": int64(2),
  390. "sink_mockSink_0_exceptions_total": int64(0),
  391. "sink_mockSink_0_records_in_total": int64(2),
  392. "sink_mockSink_0_records_out_total": int64(2),
  393. },
  394. }, {
  395. Name: `TestRestRule8`,
  396. Sql: `SELECT GetMessage(concat("messages/",ts))->text as message FROM demo WHERE size > 3`,
  397. R: [][]map[string]interface{}{
  398. {{
  399. "message": "1541152486822 content",
  400. }},
  401. {{
  402. "message": "1541152488442 content",
  403. }},
  404. },
  405. M: map[string]interface{}{
  406. "op_2_filter_0_exceptions_total": int64(0),
  407. "op_2_filter_0_process_latency_us": int64(0),
  408. "op_2_filter_0_records_in_total": int64(5),
  409. "op_2_filter_0_records_out_total": int64(2),
  410. "sink_mockSink_0_exceptions_total": int64(0),
  411. "sink_mockSink_0_records_in_total": int64(2),
  412. "sink_mockSink_0_records_out_total": int64(2),
  413. },
  414. }, {
  415. Name: `TestRestRule9`,
  416. Sql: `SELECT SearchMessage(name, size, shelf)->text as message FROM shelves`,
  417. R: [][]map[string]interface{}{
  418. {{
  419. "message": "name12sub1",
  420. }},
  421. {{
  422. "message": "name23sub2",
  423. }},
  424. {{
  425. "message": "name34sub3",
  426. }},
  427. },
  428. M: map[string]interface{}{
  429. "op_2_project_0_exceptions_total": int64(0),
  430. "op_2_project_0_process_latency_us": int64(0),
  431. "op_2_project_0_records_in_total": int64(3),
  432. "op_2_project_0_records_out_total": int64(3),
  433. "sink_mockSink_0_exceptions_total": int64(0),
  434. "sink_mockSink_0_records_in_total": int64(3),
  435. "sink_mockSink_0_records_out_total": int64(3),
  436. },
  437. // TODO support * as one of the parameters
  438. //},{
  439. // Name: `TestRestRule10`,
  440. // Sql: `SELECT UpdateMessage(message_id, *)->text as message FROM mes`,
  441. // R: [][]map[string]interface{}{
  442. // {{
  443. // "message": "message1",
  444. // }},
  445. // {{
  446. // "message": "message2",
  447. // }},
  448. // {{
  449. // "message": "message3",
  450. // }},
  451. // },
  452. // M: map[string]interface{}{
  453. // "op_2_project_0_exceptions_total": int64(0),
  454. // "op_2_project_0_process_latency_us": int64(0),
  455. // "op_2_project_0_records_in_total": int64(3),
  456. // "op_2_project_0_records_out_total": int64(3),
  457. //
  458. // "sink_mockSink_0_exceptions_total": int64(0),
  459. // "sink_mockSink_0_records_in_total": int64(3),
  460. // "sink_mockSink_0_records_out_total": int64(3),
  461. // },
  462. }, {
  463. Name: `TestRestRule11`,
  464. Sql: `SELECT PatchMessage(message_id, text)->text as message FROM mes`,
  465. R: [][]map[string]interface{}{
  466. {{
  467. "message": "message1",
  468. }},
  469. {{
  470. "message": "message2",
  471. }},
  472. {{
  473. "message": "message3",
  474. }},
  475. },
  476. M: map[string]interface{}{
  477. "op_2_project_0_exceptions_total": int64(0),
  478. "op_2_project_0_process_latency_us": int64(0),
  479. "op_2_project_0_records_in_total": int64(3),
  480. "op_2_project_0_records_out_total": int64(3),
  481. "sink_mockSink_0_exceptions_total": int64(0),
  482. "sink_mockSink_0_records_in_total": int64(3),
  483. "sink_mockSink_0_records_out_total": int64(3),
  484. },
  485. },
  486. }
  487. topotest.HandleStream(true, streamList, t)
  488. topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
  489. BufferLength: 100,
  490. SendError: true,
  491. }, 0)
  492. }
  493. func jsonOut(w http.ResponseWriter, err error, out interface{}) {
  494. w.Header().Add("Content-Type", "application/json")
  495. enc := json.NewEncoder(w)
  496. err = enc.Encode(out)
  497. // Problems encoding
  498. if err != nil {
  499. http.Error(w, err.Error(), http.StatusBadRequest)
  500. }
  501. }
  502. type Resolver map[string]reflect.Value
  503. func (self Resolver) Resolve(name string, _ []reflect.Value) (reflect.Value, error) {
  504. return self[name], nil
  505. }
  506. func SayHello(name string) map[string]interface{} {
  507. return map[string]interface{}{
  508. "message": name,
  509. }
  510. }
  511. func get_feature(img []byte) map[string]interface{} {
  512. l := len(string(img))
  513. return map[string]interface{}{
  514. "feature": []map[string]interface{}{
  515. {
  516. "features": []float64{-1.444, 2.55452, 5.121},
  517. "box": map[string]interface{}{
  518. "x": 153,
  519. "y": 107,
  520. "w": 174,
  521. "h": 100 + l,
  522. },
  523. }, {
  524. "features": []float64{1.444, -2.55452, -5.121},
  525. "box": map[string]interface{}{
  526. "x": 257,
  527. "y": 92,
  528. "w": 169,
  529. "h": 208,
  530. },
  531. },
  532. },
  533. }
  534. }
  535. func object_detection(command string, image string) map[string]interface{} {
  536. out := map[string]interface{}{
  537. "info": command,
  538. "code": 200,
  539. "image": image,
  540. "result": command + " success",
  541. "type": "S",
  542. }
  543. return out
  544. }
  545. func getStatus() bool {
  546. return true
  547. }
  548. func TestMsgpackService(t *testing.T) {
  549. // mock server
  550. res := Resolver{"SayHello": reflect.ValueOf(SayHello), "object_detection": reflect.ValueOf(object_detection), "get_feature": reflect.ValueOf(get_feature), "getStatus": reflect.ValueOf(getStatus)}
  551. serv := rpc.NewServer(res, true, nil)
  552. l, _ := net.Listen("tcp", ":50000")
  553. serv.Listen(l)
  554. go serv.Run()
  555. // Comment out because the bug in the msgpack rpc
  556. // defer serv.Stop()
  557. //Reset
  558. streamList := []string{"helloStr", "commands", "fakeBin"}
  559. topotest.HandleStream(false, streamList, t)
  560. //Data setup
  561. var tests = []topotest.RuleTest{
  562. {
  563. Name: `TestRestRule1`,
  564. Sql: `SELECT helloFromMsgpack(name) as wc FROM helloStr`,
  565. R: [][]map[string]interface{}{
  566. {{
  567. "wc": map[string]interface{}{
  568. "message": "world",
  569. },
  570. }},
  571. {{
  572. "wc": map[string]interface{}{
  573. "message": "golang",
  574. },
  575. }},
  576. {{
  577. "wc": map[string]interface{}{
  578. "message": "peacock",
  579. },
  580. }},
  581. },
  582. M: map[string]interface{}{
  583. "op_2_project_0_exceptions_total": int64(0),
  584. "op_2_project_0_process_latency_us": int64(0),
  585. "op_2_project_0_records_in_total": int64(3),
  586. "op_2_project_0_records_out_total": int64(3),
  587. "sink_mockSink_0_exceptions_total": int64(0),
  588. "sink_mockSink_0_records_in_total": int64(3),
  589. "sink_mockSink_0_records_out_total": int64(3),
  590. },
  591. }, {
  592. Name: `TestRestRule2`,
  593. Sql: `SELECT objectDetectFromMsgpack(*)->result FROM commands`,
  594. R: [][]map[string]interface{}{
  595. {{
  596. "kuiper_field_0": "get success",
  597. }},
  598. {{
  599. "kuiper_field_0": "detect success",
  600. }},
  601. {{
  602. "kuiper_field_0": "delete success",
  603. }},
  604. },
  605. M: map[string]interface{}{
  606. "op_2_project_0_exceptions_total": int64(0),
  607. "op_2_project_0_process_latency_us": int64(0),
  608. "op_2_project_0_records_in_total": int64(3),
  609. "op_2_project_0_records_out_total": int64(3),
  610. "sink_mockSink_0_exceptions_total": int64(0),
  611. "sink_mockSink_0_records_in_total": int64(3),
  612. "sink_mockSink_0_records_out_total": int64(3),
  613. },
  614. }, {
  615. Name: `TestRestRule3`,
  616. Sql: `SELECT getFeatureFromMsgpack(self)->feature[0]->box->h FROM fakeBin`,
  617. R: [][]map[string]interface{}{
  618. {{
  619. "kuiper_field_0": float64(106), //Convert by the testing tool
  620. }},
  621. {{
  622. "kuiper_field_0": float64(107),
  623. }},
  624. {{
  625. "kuiper_field_0": float64(108),
  626. }},
  627. },
  628. M: map[string]interface{}{
  629. "op_2_project_0_exceptions_total": int64(0),
  630. "op_2_project_0_process_latency_us": int64(0),
  631. "op_2_project_0_records_in_total": int64(3),
  632. "op_2_project_0_records_out_total": int64(3),
  633. "sink_mockSink_0_exceptions_total": int64(0),
  634. "sink_mockSink_0_records_in_total": int64(3),
  635. "sink_mockSink_0_records_out_total": int64(3),
  636. },
  637. //}, {
  638. // Name: `TestRestRule4`,
  639. // Sql: `SELECT getStatusFromMsgpack(), command FROM commands`,
  640. // R: [][]map[string]interface{}{
  641. // {{
  642. // "getStatusFromRest": true,
  643. // "command": "get",
  644. // }},
  645. // {{
  646. // "getStatusFromRest": true,
  647. // "command": "detect",
  648. // }},
  649. // {{
  650. // "getStatusFromRest": true,
  651. // "command": "delete",
  652. // }},
  653. // },
  654. // M: map[string]interface{}{
  655. // "op_2_project_0_exceptions_total": int64(0),
  656. // "op_2_project_0_process_latency_us": int64(0),
  657. // "op_2_project_0_records_in_total": int64(3),
  658. // "op_2_project_0_records_out_total": int64(3),
  659. //
  660. // "sink_mockSink_0_exceptions_total": int64(0),
  661. // "sink_mockSink_0_records_in_total": int64(3),
  662. // "sink_mockSink_0_records_out_total": int64(3),
  663. // },
  664. },
  665. }
  666. topotest.HandleStream(true, streamList, t)
  667. topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
  668. BufferLength: 100,
  669. SendError: true,
  670. }, 0)
  671. }
  672. type server struct {
  673. UnimplementedGreeterServer
  674. }
  675. func (s *server) SayHello(_ context.Context, in *HelloRequest) (*HelloReply, error) {
  676. return &HelloReply{Message: in.GetName()}, nil
  677. }
  678. func (s *server) ObjectDetection(_ context.Context, in *ObjectDetectionRequest) (*ObjectDetectionResponse, error) {
  679. return &ObjectDetectionResponse{
  680. Info: in.Cmd,
  681. Code: 200,
  682. Image: in.Base64Img,
  683. Result: in.Cmd + " success",
  684. Type: "S",
  685. }, nil
  686. }
  687. func (s *server) GetFeature(_ context.Context, v *wrappers.BytesValue) (*FeatureResponse, error) {
  688. l := len(string(v.Value))
  689. return &FeatureResponse{
  690. Feature: []*FeatureResult{
  691. {
  692. Features: []float32{-1.444, 2.55452, 5.121},
  693. Box: &Box{
  694. X: 153,
  695. Y: 107,
  696. W: 174,
  697. H: int32(100 + l),
  698. },
  699. },
  700. {
  701. Features: []float32{1.444, -2.55452, -5.121},
  702. Box: &Box{
  703. X: 257,
  704. Y: 92,
  705. W: 169,
  706. H: 208,
  707. },
  708. },
  709. },
  710. }, nil
  711. }
  712. func (s *server) GetStatus(context.Context, *empty.Empty) (*wrappers.BoolValue, error) {
  713. return &wrappers.BoolValue{Value: true}, nil
  714. }
  715. func TestGrpcService(t *testing.T) {
  716. lis, err := net.Listen("tcp", ":50051")
  717. if err != nil {
  718. kconf.Log.Fatalf("failed to listen: %v", err)
  719. }
  720. s := grpc.NewServer()
  721. RegisterGreeterServer(s, &server{})
  722. go func() {
  723. if err := s.Serve(lis); err != nil {
  724. kconf.Log.Fatalf("failed to serve: %v", err)
  725. }
  726. }()
  727. defer s.Stop()
  728. //Reset
  729. streamList := []string{"helloStr", "commands", "fakeBin"}
  730. topotest.HandleStream(false, streamList, t)
  731. //Data setup
  732. var tests = []topotest.RuleTest{
  733. {
  734. Name: `TestRestRule1`,
  735. Sql: `SELECT helloFromGrpc(name) as wc FROM helloStr`,
  736. R: [][]map[string]interface{}{
  737. {{
  738. "wc": map[string]interface{}{
  739. "message": "world",
  740. },
  741. }},
  742. {{
  743. "wc": map[string]interface{}{
  744. "message": "golang",
  745. },
  746. }},
  747. {{
  748. "wc": map[string]interface{}{
  749. "message": "peacock",
  750. },
  751. }},
  752. },
  753. M: map[string]interface{}{
  754. "op_2_project_0_exceptions_total": int64(0),
  755. "op_2_project_0_process_latency_us": int64(0),
  756. "op_2_project_0_records_in_total": int64(3),
  757. "op_2_project_0_records_out_total": int64(3),
  758. "sink_mockSink_0_exceptions_total": int64(0),
  759. "sink_mockSink_0_records_in_total": int64(3),
  760. "sink_mockSink_0_records_out_total": int64(3),
  761. },
  762. }, {
  763. Name: `TestRestRule2`,
  764. Sql: `SELECT objectDetectFromGrpc(cmd, base64_img)->result FROM commands`,
  765. R: [][]map[string]interface{}{
  766. {{
  767. "kuiper_field_0": "get success",
  768. }},
  769. {{
  770. "kuiper_field_0": "detect success",
  771. }},
  772. {{
  773. "kuiper_field_0": "delete success",
  774. }},
  775. },
  776. M: map[string]interface{}{
  777. "op_2_project_0_exceptions_total": int64(0),
  778. "op_2_project_0_process_latency_us": int64(0),
  779. "op_2_project_0_records_in_total": int64(3),
  780. "op_2_project_0_records_out_total": int64(3),
  781. "sink_mockSink_0_exceptions_total": int64(0),
  782. "sink_mockSink_0_records_in_total": int64(3),
  783. "sink_mockSink_0_records_out_total": int64(3),
  784. },
  785. }, {
  786. Name: `TestRestRule3`,
  787. Sql: `SELECT getFeatureFromGrpc(self)->feature[0]->box->h FROM fakeBin`,
  788. R: [][]map[string]interface{}{
  789. {{
  790. "kuiper_field_0": float64(106), //Convert by the testing tool
  791. }},
  792. {{
  793. "kuiper_field_0": float64(107),
  794. }},
  795. {{
  796. "kuiper_field_0": float64(108),
  797. }},
  798. },
  799. M: map[string]interface{}{
  800. "op_2_project_0_exceptions_total": int64(0),
  801. "op_2_project_0_process_latency_us": int64(0),
  802. "op_2_project_0_records_in_total": int64(3),
  803. "op_2_project_0_records_out_total": int64(3),
  804. "sink_mockSink_0_exceptions_total": int64(0),
  805. "sink_mockSink_0_records_in_total": int64(3),
  806. "sink_mockSink_0_records_out_total": int64(3),
  807. },
  808. }, {
  809. Name: `TestRestRule4`,
  810. Sql: `SELECT getStatusFromGrpc(), cmd FROM commands`,
  811. R: [][]map[string]interface{}{
  812. {{
  813. "getStatusFromGrpc": true,
  814. "cmd": "get",
  815. }},
  816. {{
  817. "getStatusFromGrpc": true,
  818. "cmd": "detect",
  819. }},
  820. {{
  821. "getStatusFromGrpc": true,
  822. "cmd": "delete",
  823. }},
  824. },
  825. M: map[string]interface{}{
  826. "op_2_project_0_exceptions_total": int64(0),
  827. "op_2_project_0_process_latency_us": int64(0),
  828. "op_2_project_0_records_in_total": int64(3),
  829. "op_2_project_0_records_out_total": int64(3),
  830. "sink_mockSink_0_exceptions_total": int64(0),
  831. "sink_mockSink_0_records_in_total": int64(3),
  832. "sink_mockSink_0_records_out_total": int64(3),
  833. },
  834. },
  835. }
  836. topotest.HandleStream(true, streamList, t)
  837. topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
  838. BufferLength: 100,
  839. SendError: true,
  840. }, 0)
  841. }