external_service_rule_test.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. "github.com/golang/protobuf/ptypes/empty"
  20. "github.com/golang/protobuf/ptypes/wrappers"
  21. "github.com/gorilla/mux"
  22. kconf "github.com/lf-edge/ekuiper/internal/conf"
  23. pb "github.com/lf-edge/ekuiper/internal/service/test/schemas/helloworld"
  24. "github.com/lf-edge/ekuiper/internal/topo/topotest"
  25. "github.com/lf-edge/ekuiper/pkg/api"
  26. "github.com/msgpack-rpc/msgpack-rpc-go/rpc"
  27. "google.golang.org/grpc"
  28. "io"
  29. "net"
  30. "net/http"
  31. "net/http/httptest"
  32. "reflect"
  33. "strconv"
  34. "testing"
  35. )
  36. type HelloRequest struct {
  37. Name string `json:"name,omitempty"`
  38. }
  39. type HelloReply struct {
  40. Message string `json:"message,omitempty"`
  41. }
  42. type ObjectDetectRequest struct {
  43. Command string `json:"cmd,omitempty"`
  44. Image string `json:"base64_img,omitempty"`
  45. }
  46. type ObjectDetectResponse struct {
  47. Info string `json:"cmd,omitempty"`
  48. Code int `json:"base64_img,omitempty"`
  49. Image string `json:"image,omitempty"`
  50. Result string `json:"result,omitempty"`
  51. Type string `json:"type,omitempty"`
  52. }
  53. //type Box struct {
  54. // X int32 `json:"x,omitempty"`
  55. // Y int32 `json:"y,omitempty"`
  56. // W int32 `json:"w,omitempty"`
  57. // H int32 `json:"h,omitempty"`
  58. //}
  59. //type FeatureResult struct {
  60. // Features []float64 `json:"features,omitempty"`
  61. // Box Box `json:"box,omitempty"`
  62. //}
  63. type EncodedRequest struct {
  64. Name string `json:"name,omitempty"`
  65. Size int `json:"size,omitempty"`
  66. }
  67. type ShelfMessage struct {
  68. Id string `json:"id,omitempty"`
  69. Theme string `json:"theme,omitempty"`
  70. }
  71. type ShelfMessageOut struct {
  72. Id int64 `json:"id,omitempty"`
  73. Theme string `json:"theme,omitempty"`
  74. }
  75. type BookMessage struct {
  76. Id int64 `json:"id,omitempty"`
  77. Author string `json:"author,omitempty"`
  78. Title string `json:"title,omitempty"`
  79. }
  80. type MessageMessage struct {
  81. Text string `json:"text,omitempty"`
  82. }
  83. func TestRestService(t *testing.T) {
  84. // mock server, the port is set in the sample.json
  85. l, err := net.Listen("tcp", "127.0.0.1:51234")
  86. if err != nil {
  87. t.Error(err)
  88. t.FailNow()
  89. }
  90. count := 0
  91. router := mux.NewRouter()
  92. router.HandleFunc("/SayHello", func(w http.ResponseWriter, r *http.Request) {
  93. body := &HelloRequest{}
  94. err := json.NewDecoder(r.Body).Decode(body)
  95. if err != nil {
  96. http.Error(w, err.Error(), http.StatusBadRequest)
  97. }
  98. out := &HelloReply{Message: body.Name}
  99. jsonOut(w, err, out)
  100. }).Methods(http.MethodPost)
  101. router.HandleFunc("/object_detection", func(w http.ResponseWriter, r *http.Request) {
  102. req := &ObjectDetectRequest{}
  103. err := json.NewDecoder(r.Body).Decode(req)
  104. if err != nil {
  105. http.Error(w, err.Error(), http.StatusBadRequest)
  106. }
  107. if req.Image == "" {
  108. http.Error(w, "image is not found", http.StatusBadRequest)
  109. }
  110. out := &ObjectDetectResponse{
  111. Info: req.Command,
  112. Code: 200,
  113. Image: req.Image,
  114. Result: req.Command + " success",
  115. Type: "S",
  116. }
  117. jsonOut(w, err, out)
  118. }).Methods(http.MethodPost)
  119. router.HandleFunc("/getStatus", func(w http.ResponseWriter, r *http.Request) {
  120. result := count%2 == 0
  121. count++
  122. io.WriteString(w, fmt.Sprintf("%v", result))
  123. }).Methods(http.MethodPost)
  124. router.HandleFunc("/RestEncodedJson", func(w http.ResponseWriter, r *http.Request) {
  125. req := &EncodedRequest{}
  126. err := json.NewDecoder(r.Body).Decode(req)
  127. if err != nil {
  128. http.Error(w, err.Error(), http.StatusBadRequest)
  129. }
  130. io.WriteString(w, req.Name)
  131. }).Methods(http.MethodPost)
  132. router.HandleFunc("/bookshelf/v1/shelves", func(w http.ResponseWriter, r *http.Request) {
  133. req := &ShelfMessage{}
  134. err := json.NewDecoder(r.Body).Decode(req)
  135. if err != nil {
  136. http.Error(w, err.Error(), http.StatusBadRequest)
  137. }
  138. if req.Id == "" || req.Theme == "" {
  139. http.Error(w, "empty request", http.StatusBadRequest)
  140. }
  141. idint, _ := strconv.Atoi(req.Id)
  142. out := ShelfMessageOut{Id: int64(idint), Theme: req.Theme}
  143. jsonOut(w, err, out)
  144. }).Methods(http.MethodPost)
  145. router.HandleFunc("/bookshelf/v1/shelves/{shelf}/books/{book}", func(w http.ResponseWriter, r *http.Request) {
  146. defer r.Body.Close()
  147. vars := mux.Vars(r)
  148. shelf, book := vars["shelf"], vars["book"]
  149. if shelf == "" || book == "" {
  150. http.Error(w, "empty request", http.StatusBadRequest)
  151. }
  152. idint, _ := strconv.Atoi(book)
  153. out := BookMessage{Id: int64(idint), Author: "NA", Title: "title_" + book}
  154. jsonOut(w, err, out)
  155. }).Methods(http.MethodGet)
  156. router.HandleFunc("/messaging/v1/messages/{name}", func(w http.ResponseWriter, r *http.Request) {
  157. defer r.Body.Close()
  158. vars := mux.Vars(r)
  159. name := vars["name"]
  160. if name == "" {
  161. http.Error(w, "empty request", http.StatusBadRequest)
  162. }
  163. out := MessageMessage{Text: name + " content"}
  164. jsonOut(w, err, out)
  165. }).Methods(http.MethodGet)
  166. router.HandleFunc("/messaging/v1/messages/filter/{name}", func(w http.ResponseWriter, r *http.Request) {
  167. defer r.Body.Close()
  168. vars := mux.Vars(r)
  169. name := vars["name"]
  170. q := r.URL.Query()
  171. rev, sub := q.Get("revision"), q.Get("sub.subfield")
  172. if name == "" || rev == "" || sub == "" {
  173. http.Error(w, "empty request", http.StatusBadRequest)
  174. }
  175. out := MessageMessage{Text: name + rev + sub}
  176. jsonOut(w, err, out)
  177. }).Methods(http.MethodGet)
  178. router.HandleFunc("/messaging/v1/messages/{name}", func(w http.ResponseWriter, r *http.Request) {
  179. defer r.Body.Close()
  180. vars := mux.Vars(r)
  181. name := vars["name"]
  182. if name == "" {
  183. http.Error(w, "empty request", http.StatusBadRequest)
  184. }
  185. body := &MessageMessage{}
  186. err := json.NewDecoder(r.Body).Decode(body)
  187. if err != nil {
  188. http.Error(w, err.Error(), http.StatusBadRequest)
  189. }
  190. out := MessageMessage{Text: body.Text}
  191. jsonOut(w, err, out)
  192. }).Methods(http.MethodPut, http.MethodPatch)
  193. server := httptest.NewUnstartedServer(router)
  194. server.Listener.Close()
  195. server.Listener = l
  196. // Start the server.
  197. server.Start()
  198. defer server.Close()
  199. //Reset
  200. streamList := []string{"helloStr", "commands", "fakeBin", "shelves", "demo", "mes"}
  201. topotest.HandleStream(false, streamList, t)
  202. //Data setup
  203. var tests = []topotest.RuleTest{
  204. {
  205. Name: `TestRestRule1`,
  206. Sql: `SELECT helloFromRest(name) as wc FROM helloStr`,
  207. R: [][]map[string]interface{}{
  208. {{
  209. "wc": map[string]interface{}{
  210. "message": "world",
  211. },
  212. }},
  213. {{
  214. "wc": map[string]interface{}{
  215. "message": "golang",
  216. },
  217. }},
  218. {{
  219. "wc": map[string]interface{}{
  220. "message": "peacock",
  221. },
  222. }},
  223. },
  224. M: map[string]interface{}{
  225. "op_2_project_0_exceptions_total": int64(0),
  226. "op_2_project_0_process_latency_us": int64(0),
  227. "op_2_project_0_records_in_total": int64(3),
  228. "op_2_project_0_records_out_total": int64(3),
  229. "sink_mockSink_0_exceptions_total": int64(0),
  230. "sink_mockSink_0_records_in_total": int64(3),
  231. "sink_mockSink_0_records_out_total": int64(3),
  232. },
  233. }, {
  234. Name: `TestRestRule2`,
  235. Sql: `SELECT objectDetectFromRest(cmd, base64_img)->result FROM commands`,
  236. R: [][]map[string]interface{}{
  237. {{
  238. "kuiper_field_0": "get success",
  239. }},
  240. {{
  241. "kuiper_field_0": "detect success",
  242. }},
  243. {{
  244. "kuiper_field_0": "delete success",
  245. }},
  246. },
  247. M: map[string]interface{}{
  248. "op_2_project_0_exceptions_total": int64(0),
  249. "op_2_project_0_process_latency_us": int64(0),
  250. "op_2_project_0_records_in_total": int64(3),
  251. "op_2_project_0_records_out_total": int64(3),
  252. "sink_mockSink_0_exceptions_total": int64(0),
  253. "sink_mockSink_0_records_in_total": int64(3),
  254. "sink_mockSink_0_records_out_total": int64(3),
  255. },
  256. }, {
  257. Name: `TestRestRule3`,
  258. Sql: `SELECT objectDetectFromRest(*)->result FROM commands`,
  259. R: [][]map[string]interface{}{
  260. {{
  261. "kuiper_field_0": "get success",
  262. }},
  263. {{
  264. "kuiper_field_0": "detect success",
  265. }},
  266. {{
  267. "kuiper_field_0": "delete success",
  268. }},
  269. },
  270. M: map[string]interface{}{
  271. "op_2_project_0_exceptions_total": int64(0),
  272. "op_2_project_0_process_latency_us": int64(0),
  273. "op_2_project_0_records_in_total": int64(3),
  274. "op_2_project_0_records_out_total": int64(3),
  275. "sink_mockSink_0_exceptions_total": int64(0),
  276. "sink_mockSink_0_records_in_total": int64(3),
  277. "sink_mockSink_0_records_out_total": int64(3),
  278. },
  279. //}, {
  280. // Name: `TestRestRule3`,
  281. // Sql: `SELECT getFeatureFromRest(self)->feature[0]->box->h FROM fakeBin`,
  282. // R: [][]map[string]interface{}{
  283. // {{
  284. // "kuiper_field_0": 106,
  285. // }},
  286. // {{
  287. // "kuiper_field_0": 107,
  288. // }},
  289. // {{
  290. // "kuiper_field_0": 108,
  291. // }},
  292. // },
  293. // M: map[string]interface{}{
  294. // "op_2_project_0_exceptions_total": int64(0),
  295. // "op_2_project_0_process_latency_us": int64(0),
  296. // "op_2_project_0_records_in_total": int64(3),
  297. // "op_2_project_0_records_out_total": int64(3),
  298. //
  299. // "sink_mockSink_0_exceptions_total": int64(0),
  300. // "sink_mockSink_0_records_in_total": int64(3),
  301. // "sink_mockSink_0_records_out_total": int64(3),
  302. // },
  303. }, {
  304. Name: `TestRestRule4`,
  305. Sql: `SELECT getStatusFromRest(), cmd FROM commands`,
  306. R: [][]map[string]interface{}{
  307. {{
  308. "getStatusFromRest": true,
  309. "cmd": "get",
  310. }},
  311. {{
  312. "getStatusFromRest": false,
  313. "cmd": "detect",
  314. }},
  315. {{
  316. "getStatusFromRest": true,
  317. "cmd": "delete",
  318. }},
  319. },
  320. M: map[string]interface{}{
  321. "op_2_project_0_exceptions_total": int64(0),
  322. "op_2_project_0_process_latency_us": int64(0),
  323. "op_2_project_0_records_in_total": int64(3),
  324. "op_2_project_0_records_out_total": int64(3),
  325. "sink_mockSink_0_exceptions_total": int64(0),
  326. "sink_mockSink_0_records_in_total": int64(3),
  327. "sink_mockSink_0_records_out_total": int64(3),
  328. },
  329. }, {
  330. Name: `TestRestRule5`,
  331. Sql: `SELECT restEncodedJson(encoded_json) as name FROM commands`,
  332. R: [][]map[string]interface{}{
  333. {{
  334. "name": "name1",
  335. }},
  336. {{
  337. "name": "name2",
  338. }},
  339. {{
  340. "name": "name3",
  341. }},
  342. },
  343. M: map[string]interface{}{
  344. "op_2_project_0_exceptions_total": int64(0),
  345. "op_2_project_0_process_latency_us": int64(0),
  346. "op_2_project_0_records_in_total": int64(3),
  347. "op_2_project_0_records_out_total": int64(3),
  348. "sink_mockSink_0_exceptions_total": int64(0),
  349. "sink_mockSink_0_records_in_total": int64(3),
  350. "sink_mockSink_0_records_out_total": int64(3),
  351. },
  352. }, {
  353. Name: `TestRestRule6`,
  354. Sql: `SELECT CreateShelf(shelf)->theme as theme FROM shelves`,
  355. R: [][]map[string]interface{}{
  356. {{
  357. "theme": "tandra",
  358. }},
  359. {{
  360. "theme": "claro",
  361. }},
  362. {{
  363. "theme": "dark",
  364. }},
  365. },
  366. M: map[string]interface{}{
  367. "op_2_project_0_exceptions_total": int64(0),
  368. "op_2_project_0_process_latency_us": int64(0),
  369. "op_2_project_0_records_in_total": int64(3),
  370. "op_2_project_0_records_out_total": int64(3),
  371. "sink_mockSink_0_exceptions_total": int64(0),
  372. "sink_mockSink_0_records_in_total": int64(3),
  373. "sink_mockSink_0_records_out_total": int64(3),
  374. },
  375. }, {
  376. Name: `TestRestRule7`,
  377. Sql: `SELECT GetBook(size, ts)->title as title FROM demo WHERE size > 3 `,
  378. R: [][]map[string]interface{}{
  379. {{
  380. "title": "title_1541152486822",
  381. }},
  382. {{
  383. "title": "title_1541152488442",
  384. }},
  385. },
  386. M: map[string]interface{}{
  387. "op_2_filter_0_exceptions_total": int64(0),
  388. "op_2_filter_0_process_latency_us": int64(0),
  389. "op_2_filter_0_records_in_total": int64(5),
  390. "op_2_filter_0_records_out_total": int64(2),
  391. "sink_mockSink_0_exceptions_total": int64(0),
  392. "sink_mockSink_0_records_in_total": int64(2),
  393. "sink_mockSink_0_records_out_total": int64(2),
  394. },
  395. }, {
  396. Name: `TestRestRule8`,
  397. Sql: `SELECT GetMessage(concat("messages/",ts))->text as message FROM demo WHERE size > 3`,
  398. R: [][]map[string]interface{}{
  399. {{
  400. "message": "1541152486822 content",
  401. }},
  402. {{
  403. "message": "1541152488442 content",
  404. }},
  405. },
  406. M: map[string]interface{}{
  407. "op_2_filter_0_exceptions_total": int64(0),
  408. "op_2_filter_0_process_latency_us": int64(0),
  409. "op_2_filter_0_records_in_total": int64(5),
  410. "op_2_filter_0_records_out_total": int64(2),
  411. "sink_mockSink_0_exceptions_total": int64(0),
  412. "sink_mockSink_0_records_in_total": int64(2),
  413. "sink_mockSink_0_records_out_total": int64(2),
  414. },
  415. }, {
  416. Name: `TestRestRule9`,
  417. Sql: `SELECT SearchMessage(name, size, shelf)->text as message FROM shelves`,
  418. R: [][]map[string]interface{}{
  419. {{
  420. "message": "name12sub1",
  421. }},
  422. {{
  423. "message": "name23sub2",
  424. }},
  425. {{
  426. "message": "name34sub3",
  427. }},
  428. },
  429. M: map[string]interface{}{
  430. "op_2_project_0_exceptions_total": int64(0),
  431. "op_2_project_0_process_latency_us": int64(0),
  432. "op_2_project_0_records_in_total": int64(3),
  433. "op_2_project_0_records_out_total": int64(3),
  434. "sink_mockSink_0_exceptions_total": int64(0),
  435. "sink_mockSink_0_records_in_total": int64(3),
  436. "sink_mockSink_0_records_out_total": int64(3),
  437. },
  438. // TODO support * as one of the parameters
  439. //},{
  440. // Name: `TestRestRule10`,
  441. // Sql: `SELECT UpdateMessage(message_id, *)->text as message FROM mes`,
  442. // R: [][]map[string]interface{}{
  443. // {{
  444. // "message": "message1",
  445. // }},
  446. // {{
  447. // "message": "message2",
  448. // }},
  449. // {{
  450. // "message": "message3",
  451. // }},
  452. // },
  453. // M: map[string]interface{}{
  454. // "op_2_project_0_exceptions_total": int64(0),
  455. // "op_2_project_0_process_latency_us": int64(0),
  456. // "op_2_project_0_records_in_total": int64(3),
  457. // "op_2_project_0_records_out_total": int64(3),
  458. //
  459. // "sink_mockSink_0_exceptions_total": int64(0),
  460. // "sink_mockSink_0_records_in_total": int64(3),
  461. // "sink_mockSink_0_records_out_total": int64(3),
  462. // },
  463. }, {
  464. Name: `TestRestRule11`,
  465. Sql: `SELECT PatchMessage(message_id, text)->text as message FROM mes`,
  466. R: [][]map[string]interface{}{
  467. {{
  468. "message": "message1",
  469. }},
  470. {{
  471. "message": "message2",
  472. }},
  473. {{
  474. "message": "message3",
  475. }},
  476. },
  477. M: map[string]interface{}{
  478. "op_2_project_0_exceptions_total": int64(0),
  479. "op_2_project_0_process_latency_us": int64(0),
  480. "op_2_project_0_records_in_total": int64(3),
  481. "op_2_project_0_records_out_total": int64(3),
  482. "sink_mockSink_0_exceptions_total": int64(0),
  483. "sink_mockSink_0_records_in_total": int64(3),
  484. "sink_mockSink_0_records_out_total": int64(3),
  485. },
  486. },
  487. }
  488. topotest.HandleStream(true, streamList, t)
  489. topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
  490. BufferLength: 100,
  491. SendError: true,
  492. }, 0)
  493. }
  494. func jsonOut(w http.ResponseWriter, err error, out interface{}) {
  495. w.Header().Add("Content-Type", "application/json")
  496. enc := json.NewEncoder(w)
  497. err = enc.Encode(out)
  498. // Problems encoding
  499. if err != nil {
  500. http.Error(w, err.Error(), http.StatusBadRequest)
  501. }
  502. }
  503. type Resolver map[string]reflect.Value
  504. func (self Resolver) Resolve(name string, _ []reflect.Value) (reflect.Value, error) {
  505. return self[name], nil
  506. }
  507. func SayHello(name string) map[string]interface{} {
  508. return map[string]interface{}{
  509. "message": name,
  510. }
  511. }
  512. func get_feature(img []byte) map[string]interface{} {
  513. l := len(string(img))
  514. return map[string]interface{}{
  515. "feature": []map[string]interface{}{
  516. {
  517. "features": []float64{-1.444, 2.55452, 5.121},
  518. "box": map[string]interface{}{
  519. "x": 153,
  520. "y": 107,
  521. "w": 174,
  522. "h": 100 + l,
  523. },
  524. }, {
  525. "features": []float64{1.444, -2.55452, -5.121},
  526. "box": map[string]interface{}{
  527. "x": 257,
  528. "y": 92,
  529. "w": 169,
  530. "h": 208,
  531. },
  532. },
  533. },
  534. }
  535. }
  536. func object_detection(command string, image string) map[string]interface{} {
  537. out := map[string]interface{}{
  538. "info": command,
  539. "code": 200,
  540. "image": image,
  541. "result": command + " success",
  542. "type": "S",
  543. }
  544. return out
  545. }
  546. func getStatus() bool {
  547. return true
  548. }
  549. func TestMsgpackService(t *testing.T) {
  550. // mock server
  551. res := Resolver{"SayHello": reflect.ValueOf(SayHello), "object_detection": reflect.ValueOf(object_detection), "get_feature": reflect.ValueOf(get_feature), "getStatus": reflect.ValueOf(getStatus)}
  552. serv := rpc.NewServer(res, true, nil)
  553. l, _ := net.Listen("tcp", ":50000")
  554. serv.Listen(l)
  555. go serv.Run()
  556. // Comment out because the bug in the msgpack rpc
  557. // defer serv.Stop()
  558. //Reset
  559. streamList := []string{"helloStr", "commands", "fakeBin"}
  560. topotest.HandleStream(false, streamList, t)
  561. //Data setup
  562. var tests = []topotest.RuleTest{
  563. {
  564. Name: `TestRestRule1`,
  565. Sql: `SELECT helloFromMsgpack(name) as wc FROM helloStr`,
  566. R: [][]map[string]interface{}{
  567. {{
  568. "wc": map[string]interface{}{
  569. "message": "world",
  570. },
  571. }},
  572. {{
  573. "wc": map[string]interface{}{
  574. "message": "golang",
  575. },
  576. }},
  577. {{
  578. "wc": map[string]interface{}{
  579. "message": "peacock",
  580. },
  581. }},
  582. },
  583. M: map[string]interface{}{
  584. "op_2_project_0_exceptions_total": int64(0),
  585. "op_2_project_0_process_latency_us": int64(0),
  586. "op_2_project_0_records_in_total": int64(3),
  587. "op_2_project_0_records_out_total": int64(3),
  588. "sink_mockSink_0_exceptions_total": int64(0),
  589. "sink_mockSink_0_records_in_total": int64(3),
  590. "sink_mockSink_0_records_out_total": int64(3),
  591. },
  592. }, {
  593. Name: `TestRestRule2`,
  594. Sql: `SELECT objectDetectFromMsgpack(*)->result FROM commands`,
  595. R: [][]map[string]interface{}{
  596. {{
  597. "kuiper_field_0": "get success",
  598. }},
  599. {{
  600. "kuiper_field_0": "detect success",
  601. }},
  602. {{
  603. "kuiper_field_0": "delete success",
  604. }},
  605. },
  606. M: map[string]interface{}{
  607. "op_2_project_0_exceptions_total": int64(0),
  608. "op_2_project_0_process_latency_us": int64(0),
  609. "op_2_project_0_records_in_total": int64(3),
  610. "op_2_project_0_records_out_total": int64(3),
  611. "sink_mockSink_0_exceptions_total": int64(0),
  612. "sink_mockSink_0_records_in_total": int64(3),
  613. "sink_mockSink_0_records_out_total": int64(3),
  614. },
  615. }, {
  616. Name: `TestRestRule3`,
  617. Sql: `SELECT getFeatureFromMsgpack(self)->feature[0]->box->h FROM fakeBin`,
  618. R: [][]map[string]interface{}{
  619. {{
  620. "kuiper_field_0": float64(106), //Convert by the testing tool
  621. }},
  622. {{
  623. "kuiper_field_0": float64(107),
  624. }},
  625. {{
  626. "kuiper_field_0": float64(108),
  627. }},
  628. },
  629. M: map[string]interface{}{
  630. "op_2_project_0_exceptions_total": int64(0),
  631. "op_2_project_0_process_latency_us": int64(0),
  632. "op_2_project_0_records_in_total": int64(3),
  633. "op_2_project_0_records_out_total": int64(3),
  634. "sink_mockSink_0_exceptions_total": int64(0),
  635. "sink_mockSink_0_records_in_total": int64(3),
  636. "sink_mockSink_0_records_out_total": int64(3),
  637. },
  638. //}, {
  639. // Name: `TestRestRule4`,
  640. // Sql: `SELECT getStatusFromMsgpack(), command FROM commands`,
  641. // R: [][]map[string]interface{}{
  642. // {{
  643. // "getStatusFromRest": true,
  644. // "command": "get",
  645. // }},
  646. // {{
  647. // "getStatusFromRest": true,
  648. // "command": "detect",
  649. // }},
  650. // {{
  651. // "getStatusFromRest": true,
  652. // "command": "delete",
  653. // }},
  654. // },
  655. // M: map[string]interface{}{
  656. // "op_2_project_0_exceptions_total": int64(0),
  657. // "op_2_project_0_process_latency_us": int64(0),
  658. // "op_2_project_0_records_in_total": int64(3),
  659. // "op_2_project_0_records_out_total": int64(3),
  660. //
  661. // "sink_mockSink_0_exceptions_total": int64(0),
  662. // "sink_mockSink_0_records_in_total": int64(3),
  663. // "sink_mockSink_0_records_out_total": int64(3),
  664. // },
  665. },
  666. }
  667. topotest.HandleStream(true, streamList, t)
  668. topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
  669. BufferLength: 100,
  670. SendError: true,
  671. }, 0)
  672. }
  673. type server struct {
  674. pb.UnimplementedGreeterServer
  675. }
  676. func (s *server) SayHello(_ context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
  677. return &pb.HelloReply{Message: in.GetName()}, nil
  678. }
  679. func (s *server) ObjectDetection(_ context.Context, in *pb.ObjectDetectionRequest) (*pb.ObjectDetectionResponse, error) {
  680. return &pb.ObjectDetectionResponse{
  681. Info: in.Cmd,
  682. Code: 200,
  683. Image: in.Base64Img,
  684. Result: in.Cmd + " success",
  685. Type: "S",
  686. }, nil
  687. }
  688. func (s *server) GetFeature(_ context.Context, v *wrappers.BytesValue) (*pb.FeatureResponse, error) {
  689. l := len(string(v.Value))
  690. return &pb.FeatureResponse{
  691. Feature: []*pb.FeatureResult{
  692. {
  693. Features: []float32{-1.444, 2.55452, 5.121},
  694. Box: &pb.Box{
  695. X: 153,
  696. Y: 107,
  697. W: 174,
  698. H: int32(100 + l),
  699. },
  700. },
  701. {
  702. Features: []float32{1.444, -2.55452, -5.121},
  703. Box: &pb.Box{
  704. X: 257,
  705. Y: 92,
  706. W: 169,
  707. H: 208,
  708. },
  709. },
  710. },
  711. }, nil
  712. }
  713. func (s *server) GetStatus(context.Context, *empty.Empty) (*wrappers.BoolValue, error) {
  714. return &wrappers.BoolValue{Value: true}, nil
  715. }
  716. func TestGrpcService(t *testing.T) {
  717. lis, err := net.Listen("tcp", ":50051")
  718. if err != nil {
  719. kconf.Log.Fatalf("failed to listen: %v", err)
  720. }
  721. s := grpc.NewServer()
  722. pb.RegisterGreeterServer(s, &server{})
  723. go func() {
  724. if err := s.Serve(lis); err != nil {
  725. kconf.Log.Fatalf("failed to serve: %v", err)
  726. }
  727. }()
  728. defer s.Stop()
  729. //Reset
  730. streamList := []string{"helloStr", "commands", "fakeBin"}
  731. topotest.HandleStream(false, streamList, t)
  732. //Data setup
  733. var tests = []topotest.RuleTest{
  734. {
  735. Name: `TestRestRule1`,
  736. Sql: `SELECT helloFromGrpc(name) as wc FROM helloStr`,
  737. R: [][]map[string]interface{}{
  738. {{
  739. "wc": map[string]interface{}{
  740. "message": "world",
  741. },
  742. }},
  743. {{
  744. "wc": map[string]interface{}{
  745. "message": "golang",
  746. },
  747. }},
  748. {{
  749. "wc": map[string]interface{}{
  750. "message": "peacock",
  751. },
  752. }},
  753. },
  754. M: map[string]interface{}{
  755. "op_2_project_0_exceptions_total": int64(0),
  756. "op_2_project_0_process_latency_us": int64(0),
  757. "op_2_project_0_records_in_total": int64(3),
  758. "op_2_project_0_records_out_total": int64(3),
  759. "sink_mockSink_0_exceptions_total": int64(0),
  760. "sink_mockSink_0_records_in_total": int64(3),
  761. "sink_mockSink_0_records_out_total": int64(3),
  762. },
  763. }, {
  764. Name: `TestRestRule2`,
  765. Sql: `SELECT objectDetectFromGrpc(cmd, base64_img)->result FROM commands`,
  766. R: [][]map[string]interface{}{
  767. {{
  768. "kuiper_field_0": "get success",
  769. }},
  770. {{
  771. "kuiper_field_0": "detect success",
  772. }},
  773. {{
  774. "kuiper_field_0": "delete success",
  775. }},
  776. },
  777. M: map[string]interface{}{
  778. "op_2_project_0_exceptions_total": int64(0),
  779. "op_2_project_0_process_latency_us": int64(0),
  780. "op_2_project_0_records_in_total": int64(3),
  781. "op_2_project_0_records_out_total": int64(3),
  782. "sink_mockSink_0_exceptions_total": int64(0),
  783. "sink_mockSink_0_records_in_total": int64(3),
  784. "sink_mockSink_0_records_out_total": int64(3),
  785. },
  786. }, {
  787. Name: `TestRestRule3`,
  788. Sql: `SELECT getFeatureFromGrpc(self)->feature[0]->box->h FROM fakeBin`,
  789. R: [][]map[string]interface{}{
  790. {{
  791. "kuiper_field_0": float64(106), //Convert by the testing tool
  792. }},
  793. {{
  794. "kuiper_field_0": float64(107),
  795. }},
  796. {{
  797. "kuiper_field_0": float64(108),
  798. }},
  799. },
  800. M: map[string]interface{}{
  801. "op_2_project_0_exceptions_total": int64(0),
  802. "op_2_project_0_process_latency_us": int64(0),
  803. "op_2_project_0_records_in_total": int64(3),
  804. "op_2_project_0_records_out_total": int64(3),
  805. "sink_mockSink_0_exceptions_total": int64(0),
  806. "sink_mockSink_0_records_in_total": int64(3),
  807. "sink_mockSink_0_records_out_total": int64(3),
  808. },
  809. }, {
  810. Name: `TestRestRule4`,
  811. Sql: `SELECT getStatusFromGrpc(), cmd FROM commands`,
  812. R: [][]map[string]interface{}{
  813. {{
  814. "getStatusFromGrpc": true,
  815. "cmd": "get",
  816. }},
  817. {{
  818. "getStatusFromGrpc": true,
  819. "cmd": "detect",
  820. }},
  821. {{
  822. "getStatusFromGrpc": true,
  823. "cmd": "delete",
  824. }},
  825. },
  826. M: map[string]interface{}{
  827. "op_2_project_0_exceptions_total": int64(0),
  828. "op_2_project_0_process_latency_us": int64(0),
  829. "op_2_project_0_records_in_total": int64(3),
  830. "op_2_project_0_records_out_total": int64(3),
  831. "sink_mockSink_0_exceptions_total": int64(0),
  832. "sink_mockSink_0_records_in_total": int64(3),
  833. "sink_mockSink_0_records_out_total": int64(3),
  834. },
  835. },
  836. }
  837. topotest.HandleStream(true, streamList, t)
  838. topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
  839. BufferLength: 100,
  840. SendError: true,
  841. }, 0)
  842. }