misc_func_test.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772
  1. package operators
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "reflect"
  9. "strings"
  10. "testing"
  11. )
  12. func TestMiscFunc_Apply1(t *testing.T) {
  13. var tests = []struct {
  14. sql string
  15. data *xsql.Tuple
  16. result []map[string]interface{}
  17. }{
  18. {
  19. sql: "SELECT md5(a) AS a FROM test",
  20. data: &xsql.Tuple{
  21. Emitter: "test",
  22. Message: xsql.Message{
  23. "a": "The quick brown fox jumps over the lazy dog",
  24. "b": "myb",
  25. "c": "myc",
  26. },
  27. },
  28. result: []map[string]interface{}{{
  29. "a": strings.ToLower("9E107D9D372BB6826BD81D3542A419D6"),
  30. }},
  31. },
  32. {
  33. sql: "SELECT sha1(a) AS a FROM test",
  34. data: &xsql.Tuple{
  35. Emitter: "test",
  36. Message: xsql.Message{
  37. "a": "The quick brown fox jumps over the lazy dog",
  38. "b": "myb",
  39. "c": "myc",
  40. },
  41. },
  42. result: []map[string]interface{}{{
  43. "a": strings.ToLower("2FD4E1C67A2D28FCED849EE1BB76E7391B93EB12"),
  44. }},
  45. },
  46. {
  47. sql: "SELECT sha256(a) AS a FROM test",
  48. data: &xsql.Tuple{
  49. Emitter: "test",
  50. Message: xsql.Message{
  51. "a": "The quick brown fox jumps over the lazy dog",
  52. "b": "myb",
  53. "c": "myc",
  54. },
  55. },
  56. result: []map[string]interface{}{{
  57. "a": strings.ToLower("D7A8FBB307D7809469CA9ABCB0082E4F8D5651E46D3CDB762D02D0BF37C9E592"),
  58. }},
  59. },
  60. {
  61. sql: "SELECT sha384(a) AS a FROM test",
  62. data: &xsql.Tuple{
  63. Emitter: "test",
  64. Message: xsql.Message{
  65. "a": "The quick brown fox jumps over the lazy dog",
  66. "b": "myb",
  67. "c": "myc",
  68. },
  69. },
  70. result: []map[string]interface{}{{
  71. "a": strings.ToLower("CA737F1014A48F4C0B6DD43CB177B0AFD9E5169367544C494011E3317DBF9A509CB1E5DC1E85A941BBEE3D7F2AFBC9B1"),
  72. }},
  73. },
  74. {
  75. sql: "SELECT sha512(a) AS a FROM test",
  76. data: &xsql.Tuple{
  77. Emitter: "test",
  78. Message: xsql.Message{
  79. "a": "The quick brown fox jumps over the lazy dog",
  80. "b": "myb",
  81. "c": "myc",
  82. },
  83. },
  84. result: []map[string]interface{}{{
  85. "a": strings.ToLower("07E547D9586F6A73F73FBAC0435ED76951218FB7D0C8D788A309D785436BBB642E93A252A954F23912547D1E8A3B5ED6E1BFD7097821233FA0538F3DB854FEE6"),
  86. }},
  87. },
  88. {
  89. sql: "SELECT mqtt(topic) AS a FROM test",
  90. data: &xsql.Tuple{
  91. Emitter: "test",
  92. Message: xsql.Message{},
  93. Metadata: xsql.Metadata{
  94. "topic": "devices/device_001/message",
  95. },
  96. },
  97. result: []map[string]interface{}{{
  98. "a": "devices/device_001/message",
  99. }},
  100. },
  101. {
  102. sql: "SELECT mqtt(topic) AS a FROM test",
  103. data: &xsql.Tuple{
  104. Emitter: "test",
  105. Message: xsql.Message{},
  106. Metadata: xsql.Metadata{
  107. "topic": "devices/device_001/message",
  108. },
  109. },
  110. result: []map[string]interface{}{{
  111. "a": "devices/device_001/message",
  112. }},
  113. },
  114. {
  115. sql: "SELECT topic, mqtt(topic) AS a FROM test",
  116. data: &xsql.Tuple{
  117. Emitter: "test",
  118. Message: xsql.Message{
  119. "topic": "fff",
  120. },
  121. Metadata: xsql.Metadata{
  122. "topic": "devices/device_001/message",
  123. },
  124. },
  125. result: []map[string]interface{}{{
  126. "topic": "fff",
  127. "a": "devices/device_001/message",
  128. }},
  129. },
  130. {
  131. sql: "SELECT cardinality(arr) as r FROM test",
  132. data: &xsql.Tuple{
  133. Emitter: "test",
  134. Message: xsql.Message{
  135. "temperature": 43.2,
  136. "arr": []int{},
  137. },
  138. },
  139. result: []map[string]interface{}{{
  140. "r": float64(0),
  141. }},
  142. },
  143. {
  144. sql: "SELECT cardinality(arr) as r FROM test",
  145. data: &xsql.Tuple{
  146. Emitter: "test",
  147. Message: xsql.Message{
  148. "temperature": 43.2,
  149. "arr": []int{1, 2, 3, 4, 5},
  150. },
  151. },
  152. result: []map[string]interface{}{{
  153. "r": float64(5),
  154. }},
  155. },
  156. {
  157. sql: "SELECT isNull(arr) as r FROM test",
  158. data: &xsql.Tuple{
  159. Emitter: "test",
  160. Message: xsql.Message{
  161. "temperature": 43.2,
  162. "arr": []int{},
  163. },
  164. },
  165. result: []map[string]interface{}{{
  166. "r": false,
  167. }},
  168. },
  169. {
  170. sql: "SELECT isNull(arr) as r FROM test",
  171. data: &xsql.Tuple{
  172. Emitter: "test",
  173. Message: xsql.Message{
  174. "temperature": 43.2,
  175. "arr": []float64(nil),
  176. },
  177. },
  178. result: []map[string]interface{}{{
  179. "r": true,
  180. }},
  181. },
  182. {
  183. sql: "SELECT isNull(rec) as r FROM test",
  184. data: &xsql.Tuple{
  185. Emitter: "test",
  186. Message: xsql.Message{
  187. "temperature": 43.2,
  188. "rec": map[string]interface{}(nil),
  189. },
  190. },
  191. result: []map[string]interface{}{{
  192. "r": true,
  193. }},
  194. },
  195. }
  196. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  197. contextLogger := common.Log.WithField("rule", "TestMiscFunc_Apply1")
  198. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  199. for i, tt := range tests {
  200. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  201. if err != nil || stmt == nil {
  202. t.Errorf("parse sql %s error %v", tt.sql, err)
  203. }
  204. pp := &ProjectOp{Fields: stmt.Fields}
  205. pp.isTest = true
  206. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  207. result := pp.Apply(ctx, tt.data, fv, afv)
  208. var mapRes []map[string]interface{}
  209. if v, ok := result.([]byte); ok {
  210. err := json.Unmarshal(v, &mapRes)
  211. if err != nil {
  212. t.Errorf("Failed to parse the input into map.\n")
  213. continue
  214. }
  215. if !reflect.DeepEqual(tt.result, mapRes) {
  216. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  217. }
  218. } else {
  219. t.Errorf("The returned result is not type of []byte\n")
  220. }
  221. }
  222. }
  223. func TestMqttFunc_Apply2(t *testing.T) {
  224. var tests = []struct {
  225. sql string
  226. data xsql.JoinTupleSets
  227. result []map[string]interface{}
  228. }{
  229. {
  230. sql: "SELECT id1, mqtt(src1.topic) AS a, mqtt(src2.topic) as b FROM src1 LEFT JOIN src2 ON src1.id1 = src2.id1",
  231. data: xsql.JoinTupleSets{
  232. xsql.JoinTuple{
  233. Tuples: []xsql.Tuple{
  234. {Emitter: "src1", Message: xsql.Message{"id1": "1", "f1": "v1"}, Metadata: xsql.Metadata{"topic": "devices/type1/device001"}},
  235. {Emitter: "src2", Message: xsql.Message{"id2": "1", "f2": "w1"}, Metadata: xsql.Metadata{"topic": "devices/type2/device001"}},
  236. },
  237. },
  238. },
  239. result: []map[string]interface{}{{
  240. "id1": "1",
  241. "a": "devices/type1/device001",
  242. "b": "devices/type2/device001",
  243. }},
  244. },
  245. }
  246. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  247. contextLogger := common.Log.WithField("rule", "TestMqttFunc_Apply2")
  248. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  249. for i, tt := range tests {
  250. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  251. if err != nil || stmt == nil {
  252. t.Errorf("parse sql %s error %v", tt.sql, err)
  253. }
  254. pp := &ProjectOp{Fields: stmt.Fields}
  255. pp.isTest = true
  256. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  257. result := pp.Apply(ctx, tt.data, fv, afv)
  258. var mapRes []map[string]interface{}
  259. if v, ok := result.([]byte); ok {
  260. err := json.Unmarshal(v, &mapRes)
  261. if err != nil {
  262. t.Errorf("Failed to parse the input into map.\n")
  263. continue
  264. }
  265. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  266. if !reflect.DeepEqual(tt.result, mapRes) {
  267. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  268. }
  269. } else {
  270. t.Errorf("The returned result is not type of []byte\n")
  271. }
  272. }
  273. }
  274. func TestMetaFunc_Apply1(t *testing.T) {
  275. var tests = []struct {
  276. sql string
  277. data interface{}
  278. result interface{}
  279. }{
  280. {
  281. sql: "SELECT topic, meta(topic) AS a FROM test",
  282. data: &xsql.Tuple{
  283. Emitter: "test",
  284. Message: xsql.Message{
  285. "topic": "fff",
  286. },
  287. Metadata: xsql.Metadata{
  288. "topic": "devices/device_001/message",
  289. },
  290. },
  291. result: []map[string]interface{}{{
  292. "topic": "fff",
  293. "a": "devices/device_001/message",
  294. }},
  295. },
  296. {
  297. sql: "SELECT meta(device) as d, meta(temperature->device) as r FROM test",
  298. data: &xsql.Tuple{
  299. Emitter: "test",
  300. Message: xsql.Message{
  301. "temperature": 43.2,
  302. },
  303. Metadata: xsql.Metadata{
  304. "temperature": map[string]interface{}{
  305. "id": "dfadfasfas",
  306. "device": "device2",
  307. },
  308. "device": "gateway",
  309. },
  310. },
  311. result: []map[string]interface{}{{
  312. "d": "gateway",
  313. "r": "device2",
  314. }},
  315. },
  316. {
  317. sql: "SELECT meta(*) as r FROM test",
  318. data: &xsql.Tuple{
  319. Emitter: "test",
  320. Message: xsql.Message{
  321. "temperature": 43.2,
  322. },
  323. Metadata: xsql.Metadata{
  324. "temperature": map[string]interface{}{
  325. "id": "dfadfasfas",
  326. "device": "device2",
  327. },
  328. "device": "gateway",
  329. },
  330. },
  331. result: []map[string]interface{}{{
  332. "r": map[string]interface{}{
  333. "temperature": map[string]interface{}{
  334. "id": "dfadfasfas",
  335. "device": "device2",
  336. },
  337. "device": "gateway",
  338. },
  339. }},
  340. },
  341. {
  342. sql: "SELECT topic, meta(`Light-diming`->device) AS a FROM test",
  343. data: &xsql.Tuple{
  344. Emitter: "test",
  345. Message: xsql.Message{
  346. "topic": "fff",
  347. },
  348. Metadata: xsql.Metadata{
  349. "Light-diming": map[string]interface{}{
  350. "device": "device2",
  351. },
  352. },
  353. },
  354. result: []map[string]interface{}{{
  355. "topic": "fff",
  356. "a": "device2",
  357. }},
  358. },
  359. }
  360. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  361. contextLogger := common.Log.WithField("rule", "TestMetaFunc_Apply1")
  362. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  363. for i, tt := range tests {
  364. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  365. if err != nil || stmt == nil {
  366. t.Errorf("parse sql %s error %v", tt.sql, err)
  367. }
  368. pp := &ProjectOp{Fields: stmt.Fields}
  369. pp.isTest = true
  370. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  371. result := pp.Apply(ctx, tt.data, fv, afv)
  372. var mapRes []map[string]interface{}
  373. if v, ok := result.([]byte); ok {
  374. err := json.Unmarshal(v, &mapRes)
  375. if err != nil {
  376. t.Errorf("Failed to parse the input into map.\n")
  377. continue
  378. }
  379. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  380. if !reflect.DeepEqual(tt.result, mapRes) {
  381. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  382. }
  383. } else {
  384. t.Errorf("The returned result is not type of []byte\n")
  385. }
  386. }
  387. }
  388. func TestJsonPathFunc_Apply1(t *testing.T) {
  389. var tests = []struct {
  390. sql string
  391. data interface{}
  392. result interface{}
  393. }{
  394. {
  395. sql: `SELECT json_path_query(equipment, "$.arm_right") AS a FROM test`,
  396. data: &xsql.Tuple{
  397. Emitter: "test",
  398. Message: xsql.Message{
  399. "class": "warrior",
  400. "equipment": map[string]interface{}{
  401. "rings": []map[string]interface{}{
  402. {
  403. "name": "ring of despair",
  404. "weight": 0.1,
  405. }, {
  406. "name": "ring of strength",
  407. "weight": 2.4,
  408. },
  409. },
  410. "arm_right": "Sword of flame",
  411. "arm_left": "Shield of faith",
  412. },
  413. },
  414. },
  415. result: []map[string]interface{}{{
  416. "a": "Sword of flame",
  417. }},
  418. }, {
  419. sql: `SELECT json_path_query(equipment, "$.rings[*].weight") AS a FROM test`,
  420. data: &xsql.Tuple{
  421. Emitter: "test",
  422. Message: xsql.Message{
  423. "class": "warrior",
  424. "equipment": map[string]interface{}{
  425. "rings": []interface{}{
  426. map[string]interface{}{
  427. "name": "ring of despair",
  428. "weight": 0.1,
  429. }, map[string]interface{}{
  430. "name": "ring of strength",
  431. "weight": 2.4,
  432. },
  433. },
  434. "arm_right": "Sword of flame",
  435. "arm_left": "Shield of faith",
  436. },
  437. },
  438. },
  439. result: []map[string]interface{}{{
  440. "a": []interface{}{
  441. 0.1, 2.4,
  442. },
  443. }},
  444. }, {
  445. sql: `SELECT json_path_query_first(equipment, "$.rings[*].weight") AS a FROM test`,
  446. data: &xsql.Tuple{
  447. Emitter: "test",
  448. Message: xsql.Message{
  449. "class": "warrior",
  450. "equipment": map[string]interface{}{
  451. "rings": []interface{}{
  452. map[string]interface{}{
  453. "name": "ring of despair",
  454. "weight": 0.1,
  455. }, map[string]interface{}{
  456. "name": "ring of strength",
  457. "weight": 2.4,
  458. },
  459. },
  460. "arm_right": "Sword of flame",
  461. "arm_left": "Shield of faith",
  462. },
  463. },
  464. },
  465. result: []map[string]interface{}{{
  466. "a": 0.1,
  467. }},
  468. }, {
  469. sql: `SELECT json_path_query(equipment, "$.rings[? @.weight>1]") AS a FROM test`,
  470. data: &xsql.Tuple{
  471. Emitter: "test",
  472. Message: xsql.Message{
  473. "class": "warrior",
  474. "equipment": map[string]interface{}{
  475. "rings": []interface{}{
  476. map[string]interface{}{
  477. "name": "ring of despair",
  478. "weight": 0.1,
  479. }, map[string]interface{}{
  480. "name": "ring of strength",
  481. "weight": 2.4,
  482. },
  483. },
  484. "arm_right": "Sword of flame",
  485. "arm_left": "Shield of faith",
  486. },
  487. },
  488. },
  489. result: []map[string]interface{}{{
  490. "a": []interface{}{
  491. map[string]interface{}{
  492. "name": "ring of strength",
  493. "weight": 2.4,
  494. },
  495. },
  496. }},
  497. }, {
  498. sql: `SELECT json_path_query(equipment, "$.rings[? @.weight>1].name") AS a FROM test`,
  499. data: &xsql.Tuple{
  500. Emitter: "test",
  501. Message: xsql.Message{
  502. "class": "warrior",
  503. "equipment": map[string]interface{}{
  504. "rings": []interface{}{
  505. map[string]interface{}{
  506. "name": "ring of despair",
  507. "weight": 0.1,
  508. }, map[string]interface{}{
  509. "name": "ring of strength",
  510. "weight": 2.4,
  511. },
  512. },
  513. "arm_right": "Sword of flame",
  514. "arm_left": "Shield of faith",
  515. },
  516. },
  517. },
  518. result: []map[string]interface{}{{
  519. "a": []interface{}{
  520. "ring of strength",
  521. },
  522. }},
  523. }, {
  524. sql: `SELECT json_path_exists(equipment, "$.rings[? @.weight>5]") AS a FROM test`,
  525. data: &xsql.Tuple{
  526. Emitter: "test",
  527. Message: xsql.Message{
  528. "class": "warrior",
  529. "equipment": map[string]interface{}{
  530. "rings": []interface{}{
  531. map[string]interface{}{
  532. "name": "ring of despair",
  533. "weight": 0.1,
  534. }, map[string]interface{}{
  535. "name": "ring of strength",
  536. "weight": 2.4,
  537. },
  538. },
  539. "arm_right": "Sword of flame",
  540. "arm_left": "Shield of faith",
  541. },
  542. },
  543. },
  544. result: []map[string]interface{}{{
  545. "a": false,
  546. }},
  547. }, {
  548. sql: `SELECT json_path_exists(equipment, "$.ring1") AS a FROM test`,
  549. data: &xsql.Tuple{
  550. Emitter: "test",
  551. Message: xsql.Message{
  552. "class": "warrior",
  553. "equipment": map[string]interface{}{
  554. "rings": []interface{}{
  555. map[string]interface{}{
  556. "name": "ring of despair",
  557. "weight": 0.1,
  558. }, map[string]interface{}{
  559. "name": "ring of strength",
  560. "weight": 2.4,
  561. },
  562. },
  563. "arm_right": "Sword of flame",
  564. "arm_left": "Shield of faith",
  565. },
  566. },
  567. },
  568. result: []map[string]interface{}{{
  569. "a": false,
  570. }},
  571. }, {
  572. sql: `SELECT json_path_exists(equipment, "$.rings") AS a FROM test`,
  573. data: &xsql.Tuple{
  574. Emitter: "test",
  575. Message: xsql.Message{
  576. "class": "warrior",
  577. "equipment": map[string]interface{}{
  578. "rings": []interface{}{
  579. map[string]interface{}{
  580. "name": "ring of despair",
  581. "weight": 0.1,
  582. }, map[string]interface{}{
  583. "name": "ring of strength",
  584. "weight": 2.4,
  585. },
  586. },
  587. "arm_right": "Sword of flame",
  588. "arm_left": "Shield of faith",
  589. },
  590. },
  591. },
  592. result: []map[string]interface{}{{
  593. "a": true,
  594. }},
  595. }, {
  596. sql: `SELECT json_path_query(equipment, "$.rings[? (@.weight>1)].name") AS a FROM test`,
  597. data: &xsql.Tuple{
  598. Emitter: "test",
  599. Message: xsql.Message{
  600. "class": "warrior",
  601. "equipment": map[string]interface{}{
  602. "rings": []map[string]interface{}{
  603. {
  604. "name": "ring of despair",
  605. "weight": 0.1,
  606. }, {
  607. "name": "ring of strength",
  608. "weight": 2.4,
  609. },
  610. },
  611. "arm_right": "Sword of flame",
  612. "arm_left": "Shield of faith",
  613. },
  614. },
  615. },
  616. result: []map[string]interface{}{{
  617. "a": []interface{}{
  618. "ring of strength",
  619. },
  620. }},
  621. }, {
  622. sql: `SELECT json_path_query(equipment, "$.rings[*]") AS a FROM test`,
  623. data: &xsql.Tuple{
  624. Emitter: "test",
  625. Message: xsql.Message{
  626. "class": "warrior",
  627. "equipment": map[string]interface{}{
  628. "rings": []float64{
  629. 0.1, 2.4,
  630. },
  631. "arm_right": "Sword of flame",
  632. "arm_left": "Shield of faith",
  633. },
  634. },
  635. },
  636. result: []map[string]interface{}{{
  637. "a": []interface{}{
  638. 0.1, 2.4,
  639. },
  640. }},
  641. }, {
  642. sql: `SELECT json_path_query(equipment, "$.rings") AS a FROM test`,
  643. data: &xsql.Tuple{
  644. Emitter: "test",
  645. Message: xsql.Message{
  646. "class": "warrior",
  647. "equipment": map[string]interface{}{
  648. "rings": []float64{
  649. 0.1, 2.4,
  650. },
  651. "arm_right": "Sword of flame",
  652. "arm_left": "Shield of faith",
  653. },
  654. },
  655. },
  656. result: []map[string]interface{}{{
  657. "a": []interface{}{
  658. 0.1, 2.4,
  659. },
  660. }},
  661. }, {
  662. sql: `SELECT json_path_query(equipment, "$[0].rings[1]") AS a FROM test`,
  663. data: &xsql.Tuple{
  664. Emitter: "test",
  665. Message: xsql.Message{
  666. "class": "warrior",
  667. "equipment": []map[string]interface{}{
  668. {
  669. "rings": []float64{
  670. 0.1, 2.4,
  671. },
  672. "arm_right": "Sword of flame",
  673. "arm_left": "Shield of faith",
  674. },
  675. },
  676. },
  677. },
  678. result: []map[string]interface{}{{
  679. "a": 2.4,
  680. }},
  681. }, {
  682. sql: "SELECT json_path_query(equipment, \"$[0][\\\"arm.left\\\"]\") AS a FROM test",
  683. data: &xsql.Tuple{
  684. Emitter: "test",
  685. Message: xsql.Message{
  686. "class": "warrior",
  687. "equipment": []map[string]interface{}{
  688. {
  689. "rings": []float64{
  690. 0.1, 2.4,
  691. },
  692. "arm.right": "Sword of flame",
  693. "arm.left": "Shield of faith",
  694. },
  695. },
  696. },
  697. },
  698. result: []map[string]interface{}{{
  699. "a": "Shield of faith",
  700. }},
  701. }, {
  702. sql: "SELECT json_path_query(equipment, \"$[\\\"arm.left\\\"]\") AS a FROM test",
  703. data: &xsql.Tuple{
  704. Emitter: "test",
  705. Message: xsql.Message{
  706. "class": "warrior",
  707. "equipment": `{"rings": [0.1, 2.4],"arm.right": "Sword of flame","arm.left": "Shield of faith"}`,
  708. },
  709. },
  710. result: []map[string]interface{}{{
  711. "a": "Shield of faith",
  712. }},
  713. }, {
  714. sql: "SELECT json_path_query(equipment, \"$[0][\\\"arm.left\\\"]\") AS a FROM test",
  715. data: &xsql.Tuple{
  716. Emitter: "test",
  717. Message: xsql.Message{
  718. "class": "warrior",
  719. "equipment": `[{"rings": [0.1, 2.4],"arm.right": "Sword of flame","arm.left": "Shield of faith"}]`,
  720. },
  721. },
  722. result: []map[string]interface{}{{
  723. "a": "Shield of faith",
  724. }},
  725. },
  726. }
  727. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  728. contextLogger := common.Log.WithField("rule", "TestJsonFunc_Apply1")
  729. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  730. for i, tt := range tests {
  731. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  732. if err != nil || stmt == nil {
  733. t.Errorf("parse sql %s error %v", tt.sql, err)
  734. }
  735. pp := &ProjectOp{Fields: stmt.Fields}
  736. pp.isTest = true
  737. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  738. result := pp.Apply(ctx, tt.data, fv, afv)
  739. var mapRes []map[string]interface{}
  740. if v, ok := result.([]byte); ok {
  741. err := json.Unmarshal(v, &mapRes)
  742. if err != nil {
  743. t.Errorf("Failed to parse the input into map.\n")
  744. continue
  745. }
  746. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  747. if !reflect.DeepEqual(tt.result, mapRes) {
  748. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  749. }
  750. } else {
  751. t.Errorf("The returned result is not type of []byte but found %v", result)
  752. }
  753. }
  754. }