misc_func_test.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786
  1. package operators
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "reflect"
  9. "strings"
  10. "testing"
  11. )
  12. func TestMiscFunc_Apply1(t *testing.T) {
  13. var tests = []struct {
  14. sql string
  15. data *xsql.Tuple
  16. result []map[string]interface{}
  17. }{
  18. {
  19. sql: "SELECT md5(a) AS a FROM test",
  20. data: &xsql.Tuple{
  21. Emitter: "test",
  22. Message: xsql.Message{
  23. "a": "The quick brown fox jumps over the lazy dog",
  24. "b": "myb",
  25. "c": "myc",
  26. },
  27. },
  28. result: []map[string]interface{}{{
  29. "a": strings.ToLower("9E107D9D372BB6826BD81D3542A419D6"),
  30. }},
  31. },
  32. {
  33. sql: "SELECT sha1(a) AS a FROM test",
  34. data: &xsql.Tuple{
  35. Emitter: "test",
  36. Message: xsql.Message{
  37. "a": "The quick brown fox jumps over the lazy dog",
  38. "b": "myb",
  39. "c": "myc",
  40. },
  41. },
  42. result: []map[string]interface{}{{
  43. "a": strings.ToLower("2FD4E1C67A2D28FCED849EE1BB76E7391B93EB12"),
  44. }},
  45. },
  46. {
  47. sql: "SELECT sha256(a) AS a FROM test",
  48. data: &xsql.Tuple{
  49. Emitter: "test",
  50. Message: xsql.Message{
  51. "a": "The quick brown fox jumps over the lazy dog",
  52. "b": "myb",
  53. "c": "myc",
  54. },
  55. },
  56. result: []map[string]interface{}{{
  57. "a": strings.ToLower("D7A8FBB307D7809469CA9ABCB0082E4F8D5651E46D3CDB762D02D0BF37C9E592"),
  58. }},
  59. },
  60. {
  61. sql: "SELECT sha384(a) AS a FROM test",
  62. data: &xsql.Tuple{
  63. Emitter: "test",
  64. Message: xsql.Message{
  65. "a": "The quick brown fox jumps over the lazy dog",
  66. "b": "myb",
  67. "c": "myc",
  68. },
  69. },
  70. result: []map[string]interface{}{{
  71. "a": strings.ToLower("CA737F1014A48F4C0B6DD43CB177B0AFD9E5169367544C494011E3317DBF9A509CB1E5DC1E85A941BBEE3D7F2AFBC9B1"),
  72. }},
  73. },
  74. {
  75. sql: "SELECT sha512(a) AS a FROM test",
  76. data: &xsql.Tuple{
  77. Emitter: "test",
  78. Message: xsql.Message{
  79. "a": "The quick brown fox jumps over the lazy dog",
  80. "b": "myb",
  81. "c": "myc",
  82. },
  83. },
  84. result: []map[string]interface{}{{
  85. "a": strings.ToLower("07E547D9586F6A73F73FBAC0435ED76951218FB7D0C8D788A309D785436BBB642E93A252A954F23912547D1E8A3B5ED6E1BFD7097821233FA0538F3DB854FEE6"),
  86. }},
  87. },
  88. {
  89. sql: "SELECT mqtt(topic) AS a FROM test",
  90. data: &xsql.Tuple{
  91. Emitter: "test",
  92. Message: xsql.Message{},
  93. Metadata: xsql.Metadata{
  94. "topic": "devices/device_001/message",
  95. },
  96. },
  97. result: []map[string]interface{}{{
  98. "a": "devices/device_001/message",
  99. }},
  100. },
  101. {
  102. sql: "SELECT mqtt(topic) AS a FROM test",
  103. data: &xsql.Tuple{
  104. Emitter: "test",
  105. Message: xsql.Message{},
  106. Metadata: xsql.Metadata{
  107. "topic": "devices/device_001/message",
  108. },
  109. },
  110. result: []map[string]interface{}{{
  111. "a": "devices/device_001/message",
  112. }},
  113. },
  114. {
  115. sql: "SELECT topic, mqtt(topic) AS a FROM test",
  116. data: &xsql.Tuple{
  117. Emitter: "test",
  118. Message: xsql.Message{
  119. "topic": "fff",
  120. },
  121. Metadata: xsql.Metadata{
  122. "topic": "devices/device_001/message",
  123. },
  124. },
  125. result: []map[string]interface{}{{
  126. "topic": "fff",
  127. "a": "devices/device_001/message",
  128. }},
  129. },
  130. {
  131. sql: "SELECT cardinality(arr) as r FROM test",
  132. data: &xsql.Tuple{
  133. Emitter: "test",
  134. Message: xsql.Message{
  135. "temperature": 43.2,
  136. "arr": []int{},
  137. },
  138. },
  139. result: []map[string]interface{}{{
  140. "r": float64(0),
  141. }},
  142. },
  143. {
  144. sql: "SELECT cardinality(arr) as r FROM test",
  145. data: &xsql.Tuple{
  146. Emitter: "test",
  147. Message: xsql.Message{
  148. "temperature": 43.2,
  149. "arr": []int{1, 2, 3, 4, 5},
  150. },
  151. },
  152. result: []map[string]interface{}{{
  153. "r": float64(5),
  154. }},
  155. },
  156. {
  157. sql: "SELECT isNull(arr) as r FROM test",
  158. data: &xsql.Tuple{
  159. Emitter: "test",
  160. Message: xsql.Message{
  161. "temperature": 43.2,
  162. "arr": []int{},
  163. },
  164. },
  165. result: []map[string]interface{}{{
  166. "r": false,
  167. }},
  168. },
  169. {
  170. sql: "SELECT isNull(arr) as r FROM test",
  171. data: &xsql.Tuple{
  172. Emitter: "test",
  173. Message: xsql.Message{
  174. "temperature": 43.2,
  175. "arr": []float64(nil),
  176. },
  177. },
  178. result: []map[string]interface{}{{
  179. "r": true,
  180. }},
  181. },
  182. {
  183. sql: "SELECT isNull(rec) as r FROM test",
  184. data: &xsql.Tuple{
  185. Emitter: "test",
  186. Message: xsql.Message{
  187. "temperature": 43.2,
  188. "rec": map[string]interface{}(nil),
  189. },
  190. },
  191. result: []map[string]interface{}{{
  192. "r": true,
  193. }},
  194. },
  195. {
  196. sql: "SELECT cast(a * 1000, \"datetime\") AS a FROM test",
  197. data: &xsql.Tuple{
  198. Emitter: "test",
  199. Message: xsql.Message{
  200. "a": float64(1.62000273e+09),
  201. "b": "ya",
  202. "c": "myc",
  203. },
  204. },
  205. result: []map[string]interface{}{{
  206. "a": "2021-05-03T00:45:30Z",
  207. }},
  208. },
  209. }
  210. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  211. contextLogger := common.Log.WithField("rule", "TestMiscFunc_Apply1")
  212. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  213. for i, tt := range tests {
  214. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  215. if err != nil || stmt == nil {
  216. t.Errorf("parse sql %s error %v", tt.sql, err)
  217. }
  218. pp := &ProjectOp{Fields: stmt.Fields}
  219. pp.isTest = true
  220. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  221. result := pp.Apply(ctx, tt.data, fv, afv)
  222. var mapRes []map[string]interface{}
  223. if v, ok := result.([]byte); ok {
  224. err := json.Unmarshal(v, &mapRes)
  225. if err != nil {
  226. t.Errorf("Failed to parse the input into map.\n")
  227. continue
  228. }
  229. if !reflect.DeepEqual(tt.result, mapRes) {
  230. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  231. }
  232. } else {
  233. t.Errorf("The returned result is not type of []byte\n")
  234. }
  235. }
  236. }
  237. func TestMqttFunc_Apply2(t *testing.T) {
  238. var tests = []struct {
  239. sql string
  240. data xsql.JoinTupleSets
  241. result []map[string]interface{}
  242. }{
  243. {
  244. sql: "SELECT id1, mqtt(src1.topic) AS a, mqtt(src2.topic) as b FROM src1 LEFT JOIN src2 ON src1.id1 = src2.id1",
  245. data: xsql.JoinTupleSets{
  246. xsql.JoinTuple{
  247. Tuples: []xsql.Tuple{
  248. {Emitter: "src1", Message: xsql.Message{"id1": "1", "f1": "v1"}, Metadata: xsql.Metadata{"topic": "devices/type1/device001"}},
  249. {Emitter: "src2", Message: xsql.Message{"id2": "1", "f2": "w1"}, Metadata: xsql.Metadata{"topic": "devices/type2/device001"}},
  250. },
  251. },
  252. },
  253. result: []map[string]interface{}{{
  254. "id1": "1",
  255. "a": "devices/type1/device001",
  256. "b": "devices/type2/device001",
  257. }},
  258. },
  259. }
  260. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  261. contextLogger := common.Log.WithField("rule", "TestMqttFunc_Apply2")
  262. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  263. for i, tt := range tests {
  264. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  265. if err != nil || stmt == nil {
  266. t.Errorf("parse sql %s error %v", tt.sql, err)
  267. }
  268. pp := &ProjectOp{Fields: stmt.Fields}
  269. pp.isTest = true
  270. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  271. result := pp.Apply(ctx, tt.data, fv, afv)
  272. var mapRes []map[string]interface{}
  273. if v, ok := result.([]byte); ok {
  274. err := json.Unmarshal(v, &mapRes)
  275. if err != nil {
  276. t.Errorf("Failed to parse the input into map.\n")
  277. continue
  278. }
  279. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  280. if !reflect.DeepEqual(tt.result, mapRes) {
  281. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  282. }
  283. } else {
  284. t.Errorf("The returned result is not type of []byte\n")
  285. }
  286. }
  287. }
  288. func TestMetaFunc_Apply1(t *testing.T) {
  289. var tests = []struct {
  290. sql string
  291. data interface{}
  292. result interface{}
  293. }{
  294. {
  295. sql: "SELECT topic, meta(topic) AS a FROM test",
  296. data: &xsql.Tuple{
  297. Emitter: "test",
  298. Message: xsql.Message{
  299. "topic": "fff",
  300. },
  301. Metadata: xsql.Metadata{
  302. "topic": "devices/device_001/message",
  303. },
  304. },
  305. result: []map[string]interface{}{{
  306. "topic": "fff",
  307. "a": "devices/device_001/message",
  308. }},
  309. },
  310. {
  311. sql: "SELECT meta(device) as d, meta(temperature->device) as r FROM test",
  312. data: &xsql.Tuple{
  313. Emitter: "test",
  314. Message: xsql.Message{
  315. "temperature": 43.2,
  316. },
  317. Metadata: xsql.Metadata{
  318. "temperature": map[string]interface{}{
  319. "id": "dfadfasfas",
  320. "device": "device2",
  321. },
  322. "device": "gateway",
  323. },
  324. },
  325. result: []map[string]interface{}{{
  326. "d": "gateway",
  327. "r": "device2",
  328. }},
  329. },
  330. {
  331. sql: "SELECT meta(*) as r FROM test",
  332. data: &xsql.Tuple{
  333. Emitter: "test",
  334. Message: xsql.Message{
  335. "temperature": 43.2,
  336. },
  337. Metadata: xsql.Metadata{
  338. "temperature": map[string]interface{}{
  339. "id": "dfadfasfas",
  340. "device": "device2",
  341. },
  342. "device": "gateway",
  343. },
  344. },
  345. result: []map[string]interface{}{{
  346. "r": map[string]interface{}{
  347. "temperature": map[string]interface{}{
  348. "id": "dfadfasfas",
  349. "device": "device2",
  350. },
  351. "device": "gateway",
  352. },
  353. }},
  354. },
  355. {
  356. sql: "SELECT topic, meta(`Light-diming`->device) AS a FROM test",
  357. data: &xsql.Tuple{
  358. Emitter: "test",
  359. Message: xsql.Message{
  360. "topic": "fff",
  361. },
  362. Metadata: xsql.Metadata{
  363. "Light-diming": map[string]interface{}{
  364. "device": "device2",
  365. },
  366. },
  367. },
  368. result: []map[string]interface{}{{
  369. "topic": "fff",
  370. "a": "device2",
  371. }},
  372. },
  373. }
  374. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  375. contextLogger := common.Log.WithField("rule", "TestMetaFunc_Apply1")
  376. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  377. for i, tt := range tests {
  378. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  379. if err != nil || stmt == nil {
  380. t.Errorf("parse sql %s error %v", tt.sql, err)
  381. }
  382. pp := &ProjectOp{Fields: stmt.Fields}
  383. pp.isTest = true
  384. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  385. result := pp.Apply(ctx, tt.data, fv, afv)
  386. var mapRes []map[string]interface{}
  387. if v, ok := result.([]byte); ok {
  388. err := json.Unmarshal(v, &mapRes)
  389. if err != nil {
  390. t.Errorf("Failed to parse the input into map.\n")
  391. continue
  392. }
  393. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  394. if !reflect.DeepEqual(tt.result, mapRes) {
  395. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  396. }
  397. } else {
  398. t.Errorf("The returned result is not type of []byte\n")
  399. }
  400. }
  401. }
  402. func TestJsonPathFunc_Apply1(t *testing.T) {
  403. var tests = []struct {
  404. sql string
  405. data interface{}
  406. result interface{}
  407. }{
  408. {
  409. sql: `SELECT json_path_query(equipment, "$.arm_right") AS a FROM test`,
  410. data: &xsql.Tuple{
  411. Emitter: "test",
  412. Message: xsql.Message{
  413. "class": "warrior",
  414. "equipment": map[string]interface{}{
  415. "rings": []map[string]interface{}{
  416. {
  417. "name": "ring of despair",
  418. "weight": 0.1,
  419. }, {
  420. "name": "ring of strength",
  421. "weight": 2.4,
  422. },
  423. },
  424. "arm_right": "Sword of flame",
  425. "arm_left": "Shield of faith",
  426. },
  427. },
  428. },
  429. result: []map[string]interface{}{{
  430. "a": "Sword of flame",
  431. }},
  432. }, {
  433. sql: `SELECT json_path_query(equipment, "$.rings[*].weight") AS a FROM test`,
  434. data: &xsql.Tuple{
  435. Emitter: "test",
  436. Message: xsql.Message{
  437. "class": "warrior",
  438. "equipment": map[string]interface{}{
  439. "rings": []interface{}{
  440. map[string]interface{}{
  441. "name": "ring of despair",
  442. "weight": 0.1,
  443. }, map[string]interface{}{
  444. "name": "ring of strength",
  445. "weight": 2.4,
  446. },
  447. },
  448. "arm_right": "Sword of flame",
  449. "arm_left": "Shield of faith",
  450. },
  451. },
  452. },
  453. result: []map[string]interface{}{{
  454. "a": []interface{}{
  455. 0.1, 2.4,
  456. },
  457. }},
  458. }, {
  459. sql: `SELECT json_path_query_first(equipment, "$.rings[*].weight") AS a FROM test`,
  460. data: &xsql.Tuple{
  461. Emitter: "test",
  462. Message: xsql.Message{
  463. "class": "warrior",
  464. "equipment": map[string]interface{}{
  465. "rings": []interface{}{
  466. map[string]interface{}{
  467. "name": "ring of despair",
  468. "weight": 0.1,
  469. }, map[string]interface{}{
  470. "name": "ring of strength",
  471. "weight": 2.4,
  472. },
  473. },
  474. "arm_right": "Sword of flame",
  475. "arm_left": "Shield of faith",
  476. },
  477. },
  478. },
  479. result: []map[string]interface{}{{
  480. "a": 0.1,
  481. }},
  482. }, {
  483. sql: `SELECT json_path_query(equipment, "$.rings[? @.weight>1]") AS a FROM test`,
  484. data: &xsql.Tuple{
  485. Emitter: "test",
  486. Message: xsql.Message{
  487. "class": "warrior",
  488. "equipment": map[string]interface{}{
  489. "rings": []interface{}{
  490. map[string]interface{}{
  491. "name": "ring of despair",
  492. "weight": 0.1,
  493. }, map[string]interface{}{
  494. "name": "ring of strength",
  495. "weight": 2.4,
  496. },
  497. },
  498. "arm_right": "Sword of flame",
  499. "arm_left": "Shield of faith",
  500. },
  501. },
  502. },
  503. result: []map[string]interface{}{{
  504. "a": []interface{}{
  505. map[string]interface{}{
  506. "name": "ring of strength",
  507. "weight": 2.4,
  508. },
  509. },
  510. }},
  511. }, {
  512. sql: `SELECT json_path_query(equipment, "$.rings[? @.weight>1].name") AS a FROM test`,
  513. data: &xsql.Tuple{
  514. Emitter: "test",
  515. Message: xsql.Message{
  516. "class": "warrior",
  517. "equipment": map[string]interface{}{
  518. "rings": []interface{}{
  519. map[string]interface{}{
  520. "name": "ring of despair",
  521. "weight": 0.1,
  522. }, map[string]interface{}{
  523. "name": "ring of strength",
  524. "weight": 2.4,
  525. },
  526. },
  527. "arm_right": "Sword of flame",
  528. "arm_left": "Shield of faith",
  529. },
  530. },
  531. },
  532. result: []map[string]interface{}{{
  533. "a": []interface{}{
  534. "ring of strength",
  535. },
  536. }},
  537. }, {
  538. sql: `SELECT json_path_exists(equipment, "$.rings[? @.weight>5]") AS a FROM test`,
  539. data: &xsql.Tuple{
  540. Emitter: "test",
  541. Message: xsql.Message{
  542. "class": "warrior",
  543. "equipment": map[string]interface{}{
  544. "rings": []interface{}{
  545. map[string]interface{}{
  546. "name": "ring of despair",
  547. "weight": 0.1,
  548. }, map[string]interface{}{
  549. "name": "ring of strength",
  550. "weight": 2.4,
  551. },
  552. },
  553. "arm_right": "Sword of flame",
  554. "arm_left": "Shield of faith",
  555. },
  556. },
  557. },
  558. result: []map[string]interface{}{{
  559. "a": false,
  560. }},
  561. }, {
  562. sql: `SELECT json_path_exists(equipment, "$.ring1") AS a FROM test`,
  563. data: &xsql.Tuple{
  564. Emitter: "test",
  565. Message: xsql.Message{
  566. "class": "warrior",
  567. "equipment": map[string]interface{}{
  568. "rings": []interface{}{
  569. map[string]interface{}{
  570. "name": "ring of despair",
  571. "weight": 0.1,
  572. }, map[string]interface{}{
  573. "name": "ring of strength",
  574. "weight": 2.4,
  575. },
  576. },
  577. "arm_right": "Sword of flame",
  578. "arm_left": "Shield of faith",
  579. },
  580. },
  581. },
  582. result: []map[string]interface{}{{
  583. "a": false,
  584. }},
  585. }, {
  586. sql: `SELECT json_path_exists(equipment, "$.rings") AS a FROM test`,
  587. data: &xsql.Tuple{
  588. Emitter: "test",
  589. Message: xsql.Message{
  590. "class": "warrior",
  591. "equipment": map[string]interface{}{
  592. "rings": []interface{}{
  593. map[string]interface{}{
  594. "name": "ring of despair",
  595. "weight": 0.1,
  596. }, map[string]interface{}{
  597. "name": "ring of strength",
  598. "weight": 2.4,
  599. },
  600. },
  601. "arm_right": "Sword of flame",
  602. "arm_left": "Shield of faith",
  603. },
  604. },
  605. },
  606. result: []map[string]interface{}{{
  607. "a": true,
  608. }},
  609. }, {
  610. sql: `SELECT json_path_query(equipment, "$.rings[? (@.weight>1)].name") AS a FROM test`,
  611. data: &xsql.Tuple{
  612. Emitter: "test",
  613. Message: xsql.Message{
  614. "class": "warrior",
  615. "equipment": map[string]interface{}{
  616. "rings": []map[string]interface{}{
  617. {
  618. "name": "ring of despair",
  619. "weight": 0.1,
  620. }, {
  621. "name": "ring of strength",
  622. "weight": 2.4,
  623. },
  624. },
  625. "arm_right": "Sword of flame",
  626. "arm_left": "Shield of faith",
  627. },
  628. },
  629. },
  630. result: []map[string]interface{}{{
  631. "a": []interface{}{
  632. "ring of strength",
  633. },
  634. }},
  635. }, {
  636. sql: `SELECT json_path_query(equipment, "$.rings[*]") AS a FROM test`,
  637. data: &xsql.Tuple{
  638. Emitter: "test",
  639. Message: xsql.Message{
  640. "class": "warrior",
  641. "equipment": map[string]interface{}{
  642. "rings": []float64{
  643. 0.1, 2.4,
  644. },
  645. "arm_right": "Sword of flame",
  646. "arm_left": "Shield of faith",
  647. },
  648. },
  649. },
  650. result: []map[string]interface{}{{
  651. "a": []interface{}{
  652. 0.1, 2.4,
  653. },
  654. }},
  655. }, {
  656. sql: `SELECT json_path_query(equipment, "$.rings") AS a FROM test`,
  657. data: &xsql.Tuple{
  658. Emitter: "test",
  659. Message: xsql.Message{
  660. "class": "warrior",
  661. "equipment": map[string]interface{}{
  662. "rings": []float64{
  663. 0.1, 2.4,
  664. },
  665. "arm_right": "Sword of flame",
  666. "arm_left": "Shield of faith",
  667. },
  668. },
  669. },
  670. result: []map[string]interface{}{{
  671. "a": []interface{}{
  672. 0.1, 2.4,
  673. },
  674. }},
  675. }, {
  676. sql: `SELECT json_path_query(equipment, "$[0].rings[1]") AS a FROM test`,
  677. data: &xsql.Tuple{
  678. Emitter: "test",
  679. Message: xsql.Message{
  680. "class": "warrior",
  681. "equipment": []map[string]interface{}{
  682. {
  683. "rings": []float64{
  684. 0.1, 2.4,
  685. },
  686. "arm_right": "Sword of flame",
  687. "arm_left": "Shield of faith",
  688. },
  689. },
  690. },
  691. },
  692. result: []map[string]interface{}{{
  693. "a": 2.4,
  694. }},
  695. }, {
  696. sql: "SELECT json_path_query(equipment, \"$[0][\\\"arm.left\\\"]\") AS a FROM test",
  697. data: &xsql.Tuple{
  698. Emitter: "test",
  699. Message: xsql.Message{
  700. "class": "warrior",
  701. "equipment": []map[string]interface{}{
  702. {
  703. "rings": []float64{
  704. 0.1, 2.4,
  705. },
  706. "arm.right": "Sword of flame",
  707. "arm.left": "Shield of faith",
  708. },
  709. },
  710. },
  711. },
  712. result: []map[string]interface{}{{
  713. "a": "Shield of faith",
  714. }},
  715. }, {
  716. sql: "SELECT json_path_query(equipment, \"$[\\\"arm.left\\\"]\") AS a FROM test",
  717. data: &xsql.Tuple{
  718. Emitter: "test",
  719. Message: xsql.Message{
  720. "class": "warrior",
  721. "equipment": `{"rings": [0.1, 2.4],"arm.right": "Sword of flame","arm.left": "Shield of faith"}`,
  722. },
  723. },
  724. result: []map[string]interface{}{{
  725. "a": "Shield of faith",
  726. }},
  727. }, {
  728. sql: "SELECT json_path_query(equipment, \"$[0][\\\"arm.left\\\"]\") AS a FROM test",
  729. data: &xsql.Tuple{
  730. Emitter: "test",
  731. Message: xsql.Message{
  732. "class": "warrior",
  733. "equipment": `[{"rings": [0.1, 2.4],"arm.right": "Sword of flame","arm.left": "Shield of faith"}]`,
  734. },
  735. },
  736. result: []map[string]interface{}{{
  737. "a": "Shield of faith",
  738. }},
  739. },
  740. }
  741. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  742. contextLogger := common.Log.WithField("rule", "TestJsonFunc_Apply1")
  743. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  744. for i, tt := range tests {
  745. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  746. if err != nil || stmt == nil {
  747. t.Errorf("parse sql %s error %v", tt.sql, err)
  748. }
  749. pp := &ProjectOp{Fields: stmt.Fields}
  750. pp.isTest = true
  751. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  752. result := pp.Apply(ctx, tt.data, fv, afv)
  753. var mapRes []map[string]interface{}
  754. if v, ok := result.([]byte); ok {
  755. err := json.Unmarshal(v, &mapRes)
  756. if err != nil {
  757. t.Errorf("Failed to parse the input into map.\n")
  758. continue
  759. }
  760. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  761. if !reflect.DeepEqual(tt.result, mapRes) {
  762. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  763. }
  764. } else {
  765. t.Errorf("The returned result is not type of []byte but found %v", result)
  766. }
  767. }
  768. }