misc_func_test.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784
  1. package operator
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/internal/conf"
  6. "github.com/emqx/kuiper/internal/topo/context"
  7. "github.com/emqx/kuiper/internal/xsql"
  8. "reflect"
  9. "strings"
  10. "testing"
  11. )
  12. func TestMiscFunc_Apply1(t *testing.T) {
  13. var tests = []struct {
  14. sql string
  15. data *xsql.Tuple
  16. result []map[string]interface{}
  17. }{
  18. {
  19. sql: "SELECT md5(a) AS a FROM test",
  20. data: &xsql.Tuple{
  21. Emitter: "test",
  22. Message: xsql.Message{
  23. "a": "The quick brown fox jumps over the lazy dog",
  24. "b": "myb",
  25. "c": "myc",
  26. },
  27. },
  28. result: []map[string]interface{}{{
  29. "a": strings.ToLower("9E107D9D372BB6826BD81D3542A419D6"),
  30. }},
  31. },
  32. {
  33. sql: "SELECT sha1(a) AS a FROM test",
  34. data: &xsql.Tuple{
  35. Emitter: "test",
  36. Message: xsql.Message{
  37. "a": "The quick brown fox jumps over the lazy dog",
  38. "b": "myb",
  39. "c": "myc",
  40. },
  41. },
  42. result: []map[string]interface{}{{
  43. "a": strings.ToLower("2FD4E1C67A2D28FCED849EE1BB76E7391B93EB12"),
  44. }},
  45. },
  46. {
  47. sql: "SELECT sha256(a) AS a FROM test",
  48. data: &xsql.Tuple{
  49. Emitter: "test",
  50. Message: xsql.Message{
  51. "a": "The quick brown fox jumps over the lazy dog",
  52. "b": "myb",
  53. "c": "myc",
  54. },
  55. },
  56. result: []map[string]interface{}{{
  57. "a": strings.ToLower("D7A8FBB307D7809469CA9ABCB0082E4F8D5651E46D3CDB762D02D0BF37C9E592"),
  58. }},
  59. },
  60. {
  61. sql: "SELECT sha384(a) AS a FROM test",
  62. data: &xsql.Tuple{
  63. Emitter: "test",
  64. Message: xsql.Message{
  65. "a": "The quick brown fox jumps over the lazy dog",
  66. "b": "myb",
  67. "c": "myc",
  68. },
  69. },
  70. result: []map[string]interface{}{{
  71. "a": strings.ToLower("CA737F1014A48F4C0B6DD43CB177B0AFD9E5169367544C494011E3317DBF9A509CB1E5DC1E85A941BBEE3D7F2AFBC9B1"),
  72. }},
  73. },
  74. {
  75. sql: "SELECT sha512(a) AS a FROM test",
  76. data: &xsql.Tuple{
  77. Emitter: "test",
  78. Message: xsql.Message{
  79. "a": "The quick brown fox jumps over the lazy dog",
  80. "b": "myb",
  81. "c": "myc",
  82. },
  83. },
  84. result: []map[string]interface{}{{
  85. "a": strings.ToLower("07E547D9586F6A73F73FBAC0435ED76951218FB7D0C8D788A309D785436BBB642E93A252A954F23912547D1E8A3B5ED6E1BFD7097821233FA0538F3DB854FEE6"),
  86. }},
  87. },
  88. {
  89. sql: "SELECT mqtt(topic) AS a FROM test",
  90. data: &xsql.Tuple{
  91. Emitter: "test",
  92. Message: xsql.Message{},
  93. Metadata: xsql.Metadata{
  94. "topic": "devices/device_001/message",
  95. },
  96. },
  97. result: []map[string]interface{}{{
  98. "a": "devices/device_001/message",
  99. }},
  100. },
  101. {
  102. sql: "SELECT mqtt(topic) AS a FROM test",
  103. data: &xsql.Tuple{
  104. Emitter: "test",
  105. Message: xsql.Message{},
  106. Metadata: xsql.Metadata{
  107. "topic": "devices/device_001/message",
  108. },
  109. },
  110. result: []map[string]interface{}{{
  111. "a": "devices/device_001/message",
  112. }},
  113. },
  114. {
  115. sql: "SELECT topic, mqtt(topic) AS a FROM test",
  116. data: &xsql.Tuple{
  117. Emitter: "test",
  118. Message: xsql.Message{
  119. "topic": "fff",
  120. },
  121. Metadata: xsql.Metadata{
  122. "topic": "devices/device_001/message",
  123. },
  124. },
  125. result: []map[string]interface{}{{
  126. "topic": "fff",
  127. "a": "devices/device_001/message",
  128. }},
  129. },
  130. {
  131. sql: "SELECT cardinality(arr) as r FROM test",
  132. data: &xsql.Tuple{
  133. Emitter: "test",
  134. Message: xsql.Message{
  135. "temperature": 43.2,
  136. "arr": []int{},
  137. },
  138. },
  139. result: []map[string]interface{}{{
  140. "r": float64(0),
  141. }},
  142. },
  143. {
  144. sql: "SELECT cardinality(arr) as r FROM test",
  145. data: &xsql.Tuple{
  146. Emitter: "test",
  147. Message: xsql.Message{
  148. "temperature": 43.2,
  149. "arr": []int{1, 2, 3, 4, 5},
  150. },
  151. },
  152. result: []map[string]interface{}{{
  153. "r": float64(5),
  154. }},
  155. },
  156. {
  157. sql: "SELECT isNull(arr) as r FROM test",
  158. data: &xsql.Tuple{
  159. Emitter: "test",
  160. Message: xsql.Message{
  161. "temperature": 43.2,
  162. "arr": []int{},
  163. },
  164. },
  165. result: []map[string]interface{}{{
  166. "r": false,
  167. }},
  168. },
  169. {
  170. sql: "SELECT isNull(arr) as r FROM test",
  171. data: &xsql.Tuple{
  172. Emitter: "test",
  173. Message: xsql.Message{
  174. "temperature": 43.2,
  175. "arr": []float64(nil),
  176. },
  177. },
  178. result: []map[string]interface{}{{
  179. "r": true,
  180. }},
  181. },
  182. {
  183. sql: "SELECT isNull(rec) as r FROM test",
  184. data: &xsql.Tuple{
  185. Emitter: "test",
  186. Message: xsql.Message{
  187. "temperature": 43.2,
  188. "rec": map[string]interface{}(nil),
  189. },
  190. },
  191. result: []map[string]interface{}{{
  192. "r": true,
  193. }},
  194. },
  195. {
  196. sql: "SELECT cast(a * 1000, \"datetime\") AS a FROM test",
  197. data: &xsql.Tuple{
  198. Emitter: "test",
  199. Message: xsql.Message{
  200. "a": 1.62000273e+09,
  201. "b": "ya",
  202. "c": "myc",
  203. },
  204. },
  205. result: []map[string]interface{}{{
  206. "a": "2021-05-03T00:45:30Z",
  207. }},
  208. },
  209. }
  210. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  211. contextLogger := conf.Log.WithField("rule", "TestMiscFunc_Apply1")
  212. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  213. for i, tt := range tests {
  214. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  215. if err != nil || stmt == nil {
  216. t.Errorf("parse sql %s error %v", tt.sql, err)
  217. }
  218. pp := &ProjectOp{Fields: stmt.Fields}
  219. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  220. result := pp.Apply(ctx, tt.data, fv, afv)
  221. var mapRes []map[string]interface{}
  222. if v, ok := result.([]byte); ok {
  223. err := json.Unmarshal(v, &mapRes)
  224. if err != nil {
  225. t.Errorf("Failed to parse the input into map.\n")
  226. continue
  227. }
  228. if !reflect.DeepEqual(tt.result, mapRes) {
  229. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  230. }
  231. } else {
  232. t.Errorf("The returned result is not type of []byte\n")
  233. }
  234. }
  235. }
  236. func TestMqttFunc_Apply2(t *testing.T) {
  237. var tests = []struct {
  238. sql string
  239. data *xsql.JoinTupleSets
  240. result []map[string]interface{}
  241. }{
  242. {
  243. sql: "SELECT id1, mqtt(src1.topic) AS a, mqtt(src2.topic) as b FROM src1 LEFT JOIN src2 ON src1.id1 = src2.id1",
  244. data: &xsql.JoinTupleSets{
  245. Content: []xsql.JoinTuple{
  246. {
  247. Tuples: []xsql.Tuple{
  248. {Emitter: "src1", Message: xsql.Message{"id1": "1", "f1": "v1"}, Metadata: xsql.Metadata{"topic": "devices/type1/device001"}},
  249. {Emitter: "src2", Message: xsql.Message{"id2": "1", "f2": "w1"}, Metadata: xsql.Metadata{"topic": "devices/type2/device001"}},
  250. },
  251. },
  252. },
  253. },
  254. result: []map[string]interface{}{{
  255. "id1": "1",
  256. "a": "devices/type1/device001",
  257. "b": "devices/type2/device001",
  258. }},
  259. },
  260. }
  261. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  262. contextLogger := conf.Log.WithField("rule", "TestMqttFunc_Apply2")
  263. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  264. for i, tt := range tests {
  265. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  266. if err != nil || stmt == nil {
  267. t.Errorf("parse sql %s error %v", tt.sql, err)
  268. }
  269. pp := &ProjectOp{Fields: stmt.Fields}
  270. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  271. result := pp.Apply(ctx, tt.data, fv, afv)
  272. var mapRes []map[string]interface{}
  273. if v, ok := result.([]byte); ok {
  274. err := json.Unmarshal(v, &mapRes)
  275. if err != nil {
  276. t.Errorf("Failed to parse the input into map.\n")
  277. continue
  278. }
  279. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  280. if !reflect.DeepEqual(tt.result, mapRes) {
  281. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  282. }
  283. } else {
  284. t.Errorf("The returned result is not type of []byte\n")
  285. }
  286. }
  287. }
  288. func TestMetaFunc_Apply1(t *testing.T) {
  289. var tests = []struct {
  290. sql string
  291. data interface{}
  292. result interface{}
  293. }{
  294. {
  295. sql: "SELECT topic, meta(topic) AS a FROM test",
  296. data: &xsql.Tuple{
  297. Emitter: "test",
  298. Message: xsql.Message{
  299. "topic": "fff",
  300. },
  301. Metadata: xsql.Metadata{
  302. "topic": "devices/device_001/message",
  303. },
  304. },
  305. result: []map[string]interface{}{{
  306. "topic": "fff",
  307. "a": "devices/device_001/message",
  308. }},
  309. },
  310. {
  311. sql: "SELECT meta(device) as d, meta(temperature->device) as r FROM test",
  312. data: &xsql.Tuple{
  313. Emitter: "test",
  314. Message: xsql.Message{
  315. "temperature": 43.2,
  316. },
  317. Metadata: xsql.Metadata{
  318. "temperature": map[string]interface{}{
  319. "id": "dfadfasfas",
  320. "device": "device2",
  321. },
  322. "device": "gateway",
  323. },
  324. },
  325. result: []map[string]interface{}{{
  326. "d": "gateway",
  327. "r": "device2",
  328. }},
  329. },
  330. {
  331. sql: "SELECT meta(*) as r FROM test",
  332. data: &xsql.Tuple{
  333. Emitter: "test",
  334. Message: xsql.Message{
  335. "temperature": 43.2,
  336. },
  337. Metadata: xsql.Metadata{
  338. "temperature": map[string]interface{}{
  339. "id": "dfadfasfas",
  340. "device": "device2",
  341. },
  342. "device": "gateway",
  343. },
  344. },
  345. result: []map[string]interface{}{{
  346. "r": map[string]interface{}{
  347. "temperature": map[string]interface{}{
  348. "id": "dfadfasfas",
  349. "device": "device2",
  350. },
  351. "device": "gateway",
  352. },
  353. }},
  354. },
  355. {
  356. sql: "SELECT topic, meta(`Light-diming`->device) AS a FROM test",
  357. data: &xsql.Tuple{
  358. Emitter: "test",
  359. Message: xsql.Message{
  360. "topic": "fff",
  361. },
  362. Metadata: xsql.Metadata{
  363. "Light-diming": map[string]interface{}{
  364. "device": "device2",
  365. },
  366. },
  367. },
  368. result: []map[string]interface{}{{
  369. "topic": "fff",
  370. "a": "device2",
  371. }},
  372. },
  373. }
  374. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  375. contextLogger := conf.Log.WithField("rule", "TestMetaFunc_Apply1")
  376. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  377. for i, tt := range tests {
  378. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  379. if err != nil || stmt == nil {
  380. t.Errorf("parse sql %s error %v", tt.sql, err)
  381. }
  382. pp := &ProjectOp{Fields: stmt.Fields}
  383. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  384. result := pp.Apply(ctx, tt.data, fv, afv)
  385. var mapRes []map[string]interface{}
  386. if v, ok := result.([]byte); ok {
  387. err := json.Unmarshal(v, &mapRes)
  388. if err != nil {
  389. t.Errorf("Failed to parse the input into map.\n")
  390. continue
  391. }
  392. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  393. if !reflect.DeepEqual(tt.result, mapRes) {
  394. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  395. }
  396. } else {
  397. t.Errorf("The returned result is not type of []byte\n")
  398. }
  399. }
  400. }
  401. func TestJsonPathFunc_Apply1(t *testing.T) {
  402. var tests = []struct {
  403. sql string
  404. data interface{}
  405. result interface{}
  406. }{
  407. {
  408. sql: `SELECT json_path_query(equipment, "$.arm_right") AS a FROM test`,
  409. data: &xsql.Tuple{
  410. Emitter: "test",
  411. Message: xsql.Message{
  412. "class": "warrior",
  413. "equipment": map[string]interface{}{
  414. "rings": []map[string]interface{}{
  415. {
  416. "name": "ring of despair",
  417. "weight": 0.1,
  418. }, {
  419. "name": "ring of strength",
  420. "weight": 2.4,
  421. },
  422. },
  423. "arm_right": "Sword of flame",
  424. "arm_left": "Shield of faith",
  425. },
  426. },
  427. },
  428. result: []map[string]interface{}{{
  429. "a": "Sword of flame",
  430. }},
  431. }, {
  432. sql: `SELECT json_path_query(equipment, "$.rings[*].weight") AS a FROM test`,
  433. data: &xsql.Tuple{
  434. Emitter: "test",
  435. Message: xsql.Message{
  436. "class": "warrior",
  437. "equipment": map[string]interface{}{
  438. "rings": []interface{}{
  439. map[string]interface{}{
  440. "name": "ring of despair",
  441. "weight": 0.1,
  442. }, map[string]interface{}{
  443. "name": "ring of strength",
  444. "weight": 2.4,
  445. },
  446. },
  447. "arm_right": "Sword of flame",
  448. "arm_left": "Shield of faith",
  449. },
  450. },
  451. },
  452. result: []map[string]interface{}{{
  453. "a": []interface{}{
  454. 0.1, 2.4,
  455. },
  456. }},
  457. }, {
  458. sql: `SELECT json_path_query_first(equipment, "$.rings[*].weight") AS a FROM test`,
  459. data: &xsql.Tuple{
  460. Emitter: "test",
  461. Message: xsql.Message{
  462. "class": "warrior",
  463. "equipment": map[string]interface{}{
  464. "rings": []interface{}{
  465. map[string]interface{}{
  466. "name": "ring of despair",
  467. "weight": 0.1,
  468. }, map[string]interface{}{
  469. "name": "ring of strength",
  470. "weight": 2.4,
  471. },
  472. },
  473. "arm_right": "Sword of flame",
  474. "arm_left": "Shield of faith",
  475. },
  476. },
  477. },
  478. result: []map[string]interface{}{{
  479. "a": 0.1,
  480. }},
  481. }, {
  482. sql: `SELECT json_path_query(equipment, "$.rings[? @.weight>1]") AS a FROM test`,
  483. data: &xsql.Tuple{
  484. Emitter: "test",
  485. Message: xsql.Message{
  486. "class": "warrior",
  487. "equipment": map[string]interface{}{
  488. "rings": []interface{}{
  489. map[string]interface{}{
  490. "name": "ring of despair",
  491. "weight": 0.1,
  492. }, map[string]interface{}{
  493. "name": "ring of strength",
  494. "weight": 2.4,
  495. },
  496. },
  497. "arm_right": "Sword of flame",
  498. "arm_left": "Shield of faith",
  499. },
  500. },
  501. },
  502. result: []map[string]interface{}{{
  503. "a": []interface{}{
  504. map[string]interface{}{
  505. "name": "ring of strength",
  506. "weight": 2.4,
  507. },
  508. },
  509. }},
  510. }, {
  511. sql: `SELECT json_path_query(equipment, "$.rings[? @.weight>1].name") AS a FROM test`,
  512. data: &xsql.Tuple{
  513. Emitter: "test",
  514. Message: xsql.Message{
  515. "class": "warrior",
  516. "equipment": map[string]interface{}{
  517. "rings": []interface{}{
  518. map[string]interface{}{
  519. "name": "ring of despair",
  520. "weight": 0.1,
  521. }, map[string]interface{}{
  522. "name": "ring of strength",
  523. "weight": 2.4,
  524. },
  525. },
  526. "arm_right": "Sword of flame",
  527. "arm_left": "Shield of faith",
  528. },
  529. },
  530. },
  531. result: []map[string]interface{}{{
  532. "a": []interface{}{
  533. "ring of strength",
  534. },
  535. }},
  536. }, {
  537. sql: `SELECT json_path_exists(equipment, "$.rings[? @.weight>5]") AS a FROM test`,
  538. data: &xsql.Tuple{
  539. Emitter: "test",
  540. Message: xsql.Message{
  541. "class": "warrior",
  542. "equipment": map[string]interface{}{
  543. "rings": []interface{}{
  544. map[string]interface{}{
  545. "name": "ring of despair",
  546. "weight": 0.1,
  547. }, map[string]interface{}{
  548. "name": "ring of strength",
  549. "weight": 2.4,
  550. },
  551. },
  552. "arm_right": "Sword of flame",
  553. "arm_left": "Shield of faith",
  554. },
  555. },
  556. },
  557. result: []map[string]interface{}{{
  558. "a": false,
  559. }},
  560. }, {
  561. sql: `SELECT json_path_exists(equipment, "$.ring1") AS a FROM test`,
  562. data: &xsql.Tuple{
  563. Emitter: "test",
  564. Message: xsql.Message{
  565. "class": "warrior",
  566. "equipment": map[string]interface{}{
  567. "rings": []interface{}{
  568. map[string]interface{}{
  569. "name": "ring of despair",
  570. "weight": 0.1,
  571. }, map[string]interface{}{
  572. "name": "ring of strength",
  573. "weight": 2.4,
  574. },
  575. },
  576. "arm_right": "Sword of flame",
  577. "arm_left": "Shield of faith",
  578. },
  579. },
  580. },
  581. result: []map[string]interface{}{{
  582. "a": false,
  583. }},
  584. }, {
  585. sql: `SELECT json_path_exists(equipment, "$.rings") AS a FROM test`,
  586. data: &xsql.Tuple{
  587. Emitter: "test",
  588. Message: xsql.Message{
  589. "class": "warrior",
  590. "equipment": map[string]interface{}{
  591. "rings": []interface{}{
  592. map[string]interface{}{
  593. "name": "ring of despair",
  594. "weight": 0.1,
  595. }, map[string]interface{}{
  596. "name": "ring of strength",
  597. "weight": 2.4,
  598. },
  599. },
  600. "arm_right": "Sword of flame",
  601. "arm_left": "Shield of faith",
  602. },
  603. },
  604. },
  605. result: []map[string]interface{}{{
  606. "a": true,
  607. }},
  608. }, {
  609. sql: `SELECT json_path_query(equipment, "$.rings[? (@.weight>1)].name") AS a FROM test`,
  610. data: &xsql.Tuple{
  611. Emitter: "test",
  612. Message: xsql.Message{
  613. "class": "warrior",
  614. "equipment": map[string]interface{}{
  615. "rings": []map[string]interface{}{
  616. {
  617. "name": "ring of despair",
  618. "weight": 0.1,
  619. }, {
  620. "name": "ring of strength",
  621. "weight": 2.4,
  622. },
  623. },
  624. "arm_right": "Sword of flame",
  625. "arm_left": "Shield of faith",
  626. },
  627. },
  628. },
  629. result: []map[string]interface{}{{
  630. "a": []interface{}{
  631. "ring of strength",
  632. },
  633. }},
  634. }, {
  635. sql: `SELECT json_path_query(equipment, "$.rings[*]") AS a FROM test`,
  636. data: &xsql.Tuple{
  637. Emitter: "test",
  638. Message: xsql.Message{
  639. "class": "warrior",
  640. "equipment": map[string]interface{}{
  641. "rings": []float64{
  642. 0.1, 2.4,
  643. },
  644. "arm_right": "Sword of flame",
  645. "arm_left": "Shield of faith",
  646. },
  647. },
  648. },
  649. result: []map[string]interface{}{{
  650. "a": []interface{}{
  651. 0.1, 2.4,
  652. },
  653. }},
  654. }, {
  655. sql: `SELECT json_path_query(equipment, "$.rings") AS a FROM test`,
  656. data: &xsql.Tuple{
  657. Emitter: "test",
  658. Message: xsql.Message{
  659. "class": "warrior",
  660. "equipment": map[string]interface{}{
  661. "rings": []float64{
  662. 0.1, 2.4,
  663. },
  664. "arm_right": "Sword of flame",
  665. "arm_left": "Shield of faith",
  666. },
  667. },
  668. },
  669. result: []map[string]interface{}{{
  670. "a": []interface{}{
  671. 0.1, 2.4,
  672. },
  673. }},
  674. }, {
  675. sql: `SELECT json_path_query(equipment, "$[0].rings[1]") AS a FROM test`,
  676. data: &xsql.Tuple{
  677. Emitter: "test",
  678. Message: xsql.Message{
  679. "class": "warrior",
  680. "equipment": []map[string]interface{}{
  681. {
  682. "rings": []float64{
  683. 0.1, 2.4,
  684. },
  685. "arm_right": "Sword of flame",
  686. "arm_left": "Shield of faith",
  687. },
  688. },
  689. },
  690. },
  691. result: []map[string]interface{}{{
  692. "a": 2.4,
  693. }},
  694. }, {
  695. sql: "SELECT json_path_query(equipment, \"$[0][\\\"arm.left\\\"]\") AS a FROM test",
  696. data: &xsql.Tuple{
  697. Emitter: "test",
  698. Message: xsql.Message{
  699. "class": "warrior",
  700. "equipment": []map[string]interface{}{
  701. {
  702. "rings": []float64{
  703. 0.1, 2.4,
  704. },
  705. "arm.right": "Sword of flame",
  706. "arm.left": "Shield of faith",
  707. },
  708. },
  709. },
  710. },
  711. result: []map[string]interface{}{{
  712. "a": "Shield of faith",
  713. }},
  714. }, {
  715. sql: "SELECT json_path_query(equipment, \"$[\\\"arm.left\\\"]\") AS a FROM test",
  716. data: &xsql.Tuple{
  717. Emitter: "test",
  718. Message: xsql.Message{
  719. "class": "warrior",
  720. "equipment": `{"rings": [0.1, 2.4],"arm.right": "Sword of flame","arm.left": "Shield of faith"}`,
  721. },
  722. },
  723. result: []map[string]interface{}{{
  724. "a": "Shield of faith",
  725. }},
  726. }, {
  727. sql: "SELECT json_path_query(equipment, \"$[0][\\\"arm.left\\\"]\") AS a FROM test",
  728. data: &xsql.Tuple{
  729. Emitter: "test",
  730. Message: xsql.Message{
  731. "class": "warrior",
  732. "equipment": `[{"rings": [0.1, 2.4],"arm.right": "Sword of flame","arm.left": "Shield of faith"}]`,
  733. },
  734. },
  735. result: []map[string]interface{}{{
  736. "a": "Shield of faith",
  737. }},
  738. },
  739. }
  740. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  741. contextLogger := conf.Log.WithField("rule", "TestJsonFunc_Apply1")
  742. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  743. for i, tt := range tests {
  744. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  745. if err != nil || stmt == nil {
  746. t.Errorf("parse sql %s error %v", tt.sql, err)
  747. }
  748. pp := &ProjectOp{Fields: stmt.Fields}
  749. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  750. result := pp.Apply(ctx, tt.data, fv, afv)
  751. var mapRes []map[string]interface{}
  752. if v, ok := result.([]byte); ok {
  753. err := json.Unmarshal(v, &mapRes)
  754. if err != nil {
  755. t.Errorf("Failed to parse the input into map.\n")
  756. continue
  757. }
  758. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  759. if !reflect.DeepEqual(tt.result, mapRes) {
  760. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  761. }
  762. } else {
  763. t.Errorf("The returned result is not type of []byte but found %v", result)
  764. }
  765. }
  766. }