misc_func_test.go 18 KB


  1. package plans
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "reflect"
  9. "strings"
  10. "testing"
  11. )
  12. func TestMiscFunc_Apply1(t *testing.T) {
  13. var tests = []struct {
  14. sql string
  15. data *xsql.Tuple
  16. result []map[string]interface{}
  17. }{
  18. {
  19. sql: "SELECT md5(a) AS a FROM test",
  20. data: &xsql.Tuple{
  21. Emitter: "test",
  22. Message: xsql.Message{
  23. "a": "The quick brown fox jumps over the lazy dog",
  24. "b": "myb",
  25. "c": "myc",
  26. },
  27. },
  28. result: []map[string]interface{}{{
  29. "a": strings.ToLower("9E107D9D372BB6826BD81D3542A419D6"),
  30. }},
  31. },
  32. {
  33. sql: "SELECT sha1(a) AS a FROM test",
  34. data: &xsql.Tuple{
  35. Emitter: "test",
  36. Message: xsql.Message{
  37. "a": "The quick brown fox jumps over the lazy dog",
  38. "b": "myb",
  39. "c": "myc",
  40. },
  41. },
  42. result: []map[string]interface{}{{
  43. "a": strings.ToLower("2FD4E1C67A2D28FCED849EE1BB76E7391B93EB12"),
  44. }},
  45. },
  46. {
  47. sql: "SELECT sha256(a) AS a FROM test",
  48. data: &xsql.Tuple{
  49. Emitter: "test",
  50. Message: xsql.Message{
  51. "a": "The quick brown fox jumps over the lazy dog",
  52. "b": "myb",
  53. "c": "myc",
  54. },
  55. },
  56. result: []map[string]interface{}{{
  57. "a": strings.ToLower("D7A8FBB307D7809469CA9ABCB0082E4F8D5651E46D3CDB762D02D0BF37C9E592"),
  58. }},
  59. },
  60. {
  61. sql: "SELECT sha384(a) AS a FROM test",
  62. data: &xsql.Tuple{
  63. Emitter: "test",
  64. Message: xsql.Message{
  65. "a": "The quick brown fox jumps over the lazy dog",
  66. "b": "myb",
  67. "c": "myc",
  68. },
  69. },
  70. result: []map[string]interface{}{{
  71. "a": strings.ToLower("CA737F1014A48F4C0B6DD43CB177B0AFD9E5169367544C494011E3317DBF9A509CB1E5DC1E85A941BBEE3D7F2AFBC9B1"),
  72. }},
  73. },
  74. {
  75. sql: "SELECT sha512(a) AS a FROM test",
  76. data: &xsql.Tuple{
  77. Emitter: "test",
  78. Message: xsql.Message{
  79. "a": "The quick brown fox jumps over the lazy dog",
  80. "b": "myb",
  81. "c": "myc",
  82. },
  83. },
  84. result: []map[string]interface{}{{
  85. "a": strings.ToLower("07E547D9586F6A73F73FBAC0435ED76951218FB7D0C8D788A309D785436BBB642E93A252A954F23912547D1E8A3B5ED6E1BFD7097821233FA0538F3DB854FEE6"),
  86. }},
  87. },
  88. {
  89. sql: "SELECT mqtt(topic) AS a FROM test",
  90. data: &xsql.Tuple{
  91. Emitter: "test",
  92. Message: xsql.Message{},
  93. Metadata: xsql.Metadata{
  94. "topic": "devices/device_001/message",
  95. },
  96. },
  97. result: []map[string]interface{}{{
  98. "a": "devices/device_001/message",
  99. }},
  100. },
  101. {
  102. sql: "SELECT mqtt(topic) AS a FROM test",
  103. data: &xsql.Tuple{
  104. Emitter: "test",
  105. Message: xsql.Message{},
  106. Metadata: xsql.Metadata{
  107. "topic": "devices/device_001/message",
  108. },
  109. },
  110. result: []map[string]interface{}{{
  111. "a": "devices/device_001/message",
  112. }},
  113. },
  114. {
  115. sql: "SELECT topic, mqtt(topic) AS a FROM test",
  116. data: &xsql.Tuple{
  117. Emitter: "test",
  118. Message: xsql.Message{
  119. "topic": "fff",
  120. },
  121. Metadata: xsql.Metadata{
  122. "topic": "devices/device_001/message",
  123. },
  124. },
  125. result: []map[string]interface{}{{
  126. "topic": "fff",
  127. "a": "devices/device_001/message",
  128. }},
  129. },
  130. {
  131. sql: "SELECT isNull(arr) as r FROM test",
  132. data: &xsql.Tuple{
  133. Emitter: "test",
  134. Message: xsql.Message{
  135. "temperature": 43.2,
  136. "arr": []int{},
  137. },
  138. },
  139. result: []map[string]interface{}{{
  140. "r": false,
  141. }},
  142. },
  143. {
  144. sql: "SELECT isNull(arr) as r FROM test",
  145. data: &xsql.Tuple{
  146. Emitter: "test",
  147. Message: xsql.Message{
  148. "temperature": 43.2,
  149. "arr": []float64(nil),
  150. },
  151. },
  152. result: []map[string]interface{}{{
  153. "r": true,
  154. }},
  155. }, {
  156. sql: "SELECT isNull(rec) as r FROM test",
  157. data: &xsql.Tuple{
  158. Emitter: "test",
  159. Message: xsql.Message{
  160. "temperature": 43.2,
  161. "rec": map[string]interface{}(nil),
  162. },
  163. },
  164. result: []map[string]interface{}{{
  165. "r": true,
  166. }},
  167. },
  168. }
  169. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  170. contextLogger := common.Log.WithField("rule", "TestMiscFunc_Apply1")
  171. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  172. for i, tt := range tests {
  173. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  174. if err != nil || stmt == nil {
  175. t.Errorf("parse sql %s error %v", tt.sql, err)
  176. }
  177. pp := &ProjectPlan{Fields: stmt.Fields}
  178. pp.isTest = true
  179. fv, afv := xsql.NewFunctionValuersForOp(nil)
  180. result := pp.Apply(ctx, tt.data, fv, afv)
  181. var mapRes []map[string]interface{}
  182. if v, ok := result.([]byte); ok {
  183. err := json.Unmarshal(v, &mapRes)
  184. if err != nil {
  185. t.Errorf("Failed to parse the input into map.\n")
  186. continue
  187. }
  188. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  189. if !reflect.DeepEqual(tt.result, mapRes) {
  190. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  191. }
  192. } else {
  193. t.Errorf("The returned result is not type of []byte\n")
  194. }
  195. }
  196. }
  197. func TestMqttFunc_Apply2(t *testing.T) {
  198. var tests = []struct {
  199. sql string
  200. data xsql.JoinTupleSets
  201. result []map[string]interface{}
  202. }{
  203. {
  204. sql: "SELECT id1, mqtt(src1.topic) AS a, mqtt(src2.topic) as b FROM src1 LEFT JOIN src2 ON src1.id1 = src2.id1",
  205. data: xsql.JoinTupleSets{
  206. xsql.JoinTuple{
  207. Tuples: []xsql.Tuple{
  208. {Emitter: "src1", Message: xsql.Message{"id1": "1", "f1": "v1"}, Metadata: xsql.Metadata{"topic": "devices/type1/device001"}},
  209. {Emitter: "src2", Message: xsql.Message{"id2": "1", "f2": "w1"}, Metadata: xsql.Metadata{"topic": "devices/type2/device001"}},
  210. },
  211. },
  212. },
  213. result: []map[string]interface{}{{
  214. "id1": "1",
  215. "a": "devices/type1/device001",
  216. "b": "devices/type2/device001",
  217. }},
  218. },
  219. }
  220. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  221. contextLogger := common.Log.WithField("rule", "TestMqttFunc_Apply2")
  222. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  223. for i, tt := range tests {
  224. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  225. if err != nil || stmt == nil {
  226. t.Errorf("parse sql %s error %v", tt.sql, err)
  227. }
  228. pp := &ProjectPlan{Fields: stmt.Fields}
  229. pp.isTest = true
  230. fv, afv := xsql.NewFunctionValuersForOp(nil)
  231. result := pp.Apply(ctx, tt.data, fv, afv)
  232. var mapRes []map[string]interface{}
  233. if v, ok := result.([]byte); ok {
  234. err := json.Unmarshal(v, &mapRes)
  235. if err != nil {
  236. t.Errorf("Failed to parse the input into map.\n")
  237. continue
  238. }
  239. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  240. if !reflect.DeepEqual(tt.result, mapRes) {
  241. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  242. }
  243. } else {
  244. t.Errorf("The returned result is not type of []byte\n")
  245. }
  246. }
  247. }
  248. func TestMetaFunc_Apply1(t *testing.T) {
  249. var tests = []struct {
  250. sql string
  251. data interface{}
  252. result interface{}
  253. }{
  254. {
  255. sql: "SELECT topic, meta(topic) AS a FROM test",
  256. data: &xsql.Tuple{
  257. Emitter: "test",
  258. Message: xsql.Message{
  259. "topic": "fff",
  260. },
  261. Metadata: xsql.Metadata{
  262. "topic": "devices/device_001/message",
  263. },
  264. },
  265. result: []map[string]interface{}{{
  266. "topic": "fff",
  267. "a": "devices/device_001/message",
  268. }},
  269. },
  270. {
  271. sql: "SELECT meta(device) as d, meta(temperature->device) as r FROM test",
  272. data: &xsql.Tuple{
  273. Emitter: "test",
  274. Message: xsql.Message{
  275. "temperature": 43.2,
  276. },
  277. Metadata: xsql.Metadata{
  278. "temperature": map[string]interface{}{
  279. "id": "dfadfasfas",
  280. "device": "device2",
  281. },
  282. "device": "gateway",
  283. },
  284. },
  285. result: []map[string]interface{}{{
  286. "d": "gateway",
  287. "r": "device2",
  288. }},
  289. },
  290. {
  291. sql: "SELECT meta(*) as r FROM test",
  292. data: &xsql.Tuple{
  293. Emitter: "test",
  294. Message: xsql.Message{
  295. "temperature": 43.2,
  296. },
  297. Metadata: xsql.Metadata{
  298. "temperature": map[string]interface{}{
  299. "id": "dfadfasfas",
  300. "device": "device2",
  301. },
  302. "device": "gateway",
  303. },
  304. },
  305. result: []map[string]interface{}{{
  306. "r": map[string]interface{}{
  307. "temperature": map[string]interface{}{
  308. "id": "dfadfasfas",
  309. "device": "device2",
  310. },
  311. "device": "gateway",
  312. },
  313. }},
  314. },
  315. {
  316. sql: "SELECT topic, meta(`Light-diming`->device) AS a FROM test",
  317. data: &xsql.Tuple{
  318. Emitter: "test",
  319. Message: xsql.Message{
  320. "topic": "fff",
  321. },
  322. Metadata: xsql.Metadata{
  323. "Light-diming": map[string]interface{}{
  324. "device": "device2",
  325. },
  326. },
  327. },
  328. result: []map[string]interface{}{{
  329. "topic": "fff",
  330. "a": "device2",
  331. }},
  332. },
  333. }
  334. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  335. contextLogger := common.Log.WithField("rule", "TestMetaFunc_Apply1")
  336. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  337. for i, tt := range tests {
  338. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  339. if err != nil || stmt == nil {
  340. t.Errorf("parse sql %s error %v", tt.sql, err)
  341. }
  342. pp := &ProjectPlan{Fields: stmt.Fields}
  343. pp.isTest = true
  344. fv, afv := xsql.NewFunctionValuersForOp(nil)
  345. result := pp.Apply(ctx, tt.data, fv, afv)
  346. var mapRes []map[string]interface{}
  347. if v, ok := result.([]byte); ok {
  348. err := json.Unmarshal(v, &mapRes)
  349. if err != nil {
  350. t.Errorf("Failed to parse the input into map.\n")
  351. continue
  352. }
  353. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  354. if !reflect.DeepEqual(tt.result, mapRes) {
  355. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  356. }
  357. } else {
  358. t.Errorf("The returned result is not type of []byte\n")
  359. }
  360. }
  361. }
  362. func TestJsonPathFunc_Apply1(t *testing.T) {
  363. var tests = []struct {
  364. sql string
  365. data interface{}
  366. result interface{}
  367. }{
  368. {
  369. sql: `SELECT json_path_query(equipment, "$.arm_right") AS a FROM test`,
  370. data: &xsql.Tuple{
  371. Emitter: "test",
  372. Message: xsql.Message{
  373. "class": "warrior",
  374. "equipment": map[string]interface{}{
  375. "rings": []map[string]interface{}{
  376. {
  377. "name": "ring of despair",
  378. "weight": 0.1,
  379. }, {
  380. "name": "ring of strength",
  381. "weight": 2.4,
  382. },
  383. },
  384. "arm_right": "Sword of flame",
  385. "arm_left": "Shield of faith",
  386. },
  387. },
  388. },
  389. result: []map[string]interface{}{{
  390. "a": "Sword of flame",
  391. }},
  392. }, {
  393. sql: `SELECT json_path_query(equipment, "$.rings[*].weight") AS a FROM test`,
  394. data: &xsql.Tuple{
  395. Emitter: "test",
  396. Message: xsql.Message{
  397. "class": "warrior",
  398. "equipment": map[string]interface{}{
  399. "rings": []interface{}{
  400. map[string]interface{}{
  401. "name": "ring of despair",
  402. "weight": 0.1,
  403. }, map[string]interface{}{
  404. "name": "ring of strength",
  405. "weight": 2.4,
  406. },
  407. },
  408. "arm_right": "Sword of flame",
  409. "arm_left": "Shield of faith",
  410. },
  411. },
  412. },
  413. result: []map[string]interface{}{{
  414. "a": []interface{}{
  415. 0.1, 2.4,
  416. },
  417. }},
  418. }, {
  419. sql: `SELECT json_path_query_first(equipment, "$.rings[*].weight") AS a FROM test`,
  420. data: &xsql.Tuple{
  421. Emitter: "test",
  422. Message: xsql.Message{
  423. "class": "warrior",
  424. "equipment": map[string]interface{}{
  425. "rings": []interface{}{
  426. map[string]interface{}{
  427. "name": "ring of despair",
  428. "weight": 0.1,
  429. }, map[string]interface{}{
  430. "name": "ring of strength",
  431. "weight": 2.4,
  432. },
  433. },
  434. "arm_right": "Sword of flame",
  435. "arm_left": "Shield of faith",
  436. },
  437. },
  438. },
  439. result: []map[string]interface{}{{
  440. "a": 0.1,
  441. }},
  442. }, {
  443. sql: `SELECT json_path_query(equipment, "$.rings[? @.weight>1]") AS a FROM test`,
  444. data: &xsql.Tuple{
  445. Emitter: "test",
  446. Message: xsql.Message{
  447. "class": "warrior",
  448. "equipment": map[string]interface{}{
  449. "rings": []interface{}{
  450. map[string]interface{}{
  451. "name": "ring of despair",
  452. "weight": 0.1,
  453. }, map[string]interface{}{
  454. "name": "ring of strength",
  455. "weight": 2.4,
  456. },
  457. },
  458. "arm_right": "Sword of flame",
  459. "arm_left": "Shield of faith",
  460. },
  461. },
  462. },
  463. result: []map[string]interface{}{{
  464. "a": []interface{}{
  465. map[string]interface{}{
  466. "name": "ring of strength",
  467. "weight": 2.4,
  468. },
  469. },
  470. }},
  471. }, {
  472. sql: `SELECT json_path_query(equipment, "$.rings[? @.weight>1].name") AS a FROM test`,
  473. data: &xsql.Tuple{
  474. Emitter: "test",
  475. Message: xsql.Message{
  476. "class": "warrior",
  477. "equipment": map[string]interface{}{
  478. "rings": []interface{}{
  479. map[string]interface{}{
  480. "name": "ring of despair",
  481. "weight": 0.1,
  482. }, map[string]interface{}{
  483. "name": "ring of strength",
  484. "weight": 2.4,
  485. },
  486. },
  487. "arm_right": "Sword of flame",
  488. "arm_left": "Shield of faith",
  489. },
  490. },
  491. },
  492. result: []map[string]interface{}{{
  493. "a": []interface{}{
  494. "ring of strength",
  495. },
  496. }},
  497. }, {
  498. sql: `SELECT json_path_exists(equipment, "$.rings[? @.weight>5]") AS a FROM test`,
  499. data: &xsql.Tuple{
  500. Emitter: "test",
  501. Message: xsql.Message{
  502. "class": "warrior",
  503. "equipment": map[string]interface{}{
  504. "rings": []interface{}{
  505. map[string]interface{}{
  506. "name": "ring of despair",
  507. "weight": 0.1,
  508. }, map[string]interface{}{
  509. "name": "ring of strength",
  510. "weight": 2.4,
  511. },
  512. },
  513. "arm_right": "Sword of flame",
  514. "arm_left": "Shield of faith",
  515. },
  516. },
  517. },
  518. result: []map[string]interface{}{{
  519. "a": false,
  520. }},
  521. }, {
  522. sql: `SELECT json_path_exists(equipment, "$.ring1") AS a FROM test`,
  523. data: &xsql.Tuple{
  524. Emitter: "test",
  525. Message: xsql.Message{
  526. "class": "warrior",
  527. "equipment": map[string]interface{}{
  528. "rings": []interface{}{
  529. map[string]interface{}{
  530. "name": "ring of despair",
  531. "weight": 0.1,
  532. }, map[string]interface{}{
  533. "name": "ring of strength",
  534. "weight": 2.4,
  535. },
  536. },
  537. "arm_right": "Sword of flame",
  538. "arm_left": "Shield of faith",
  539. },
  540. },
  541. },
  542. result: []map[string]interface{}{{
  543. "a": false,
  544. }},
  545. }, {
  546. sql: `SELECT json_path_exists(equipment, "$.rings") AS a FROM test`,
  547. data: &xsql.Tuple{
  548. Emitter: "test",
  549. Message: xsql.Message{
  550. "class": "warrior",
  551. "equipment": map[string]interface{}{
  552. "rings": []interface{}{
  553. map[string]interface{}{
  554. "name": "ring of despair",
  555. "weight": 0.1,
  556. }, map[string]interface{}{
  557. "name": "ring of strength",
  558. "weight": 2.4,
  559. },
  560. },
  561. "arm_right": "Sword of flame",
  562. "arm_left": "Shield of faith",
  563. },
  564. },
  565. },
  566. result: []map[string]interface{}{{
  567. "a": true,
  568. }},
  569. }, {
  570. sql: `SELECT json_path_query(equipment, "$.rings[? (@.weight>1)].name") AS a FROM test`,
  571. data: &xsql.Tuple{
  572. Emitter: "test",
  573. Message: xsql.Message{
  574. "class": "warrior",
  575. "equipment": map[string]interface{}{
  576. "rings": []map[string]interface{}{
  577. {
  578. "name": "ring of despair",
  579. "weight": 0.1,
  580. }, {
  581. "name": "ring of strength",
  582. "weight": 2.4,
  583. },
  584. },
  585. "arm_right": "Sword of flame",
  586. "arm_left": "Shield of faith",
  587. },
  588. },
  589. },
  590. result: []map[string]interface{}{{
  591. "a": []interface{}{
  592. "ring of strength",
  593. },
  594. }},
  595. }, {
  596. sql: `SELECT json_path_query(equipment, "$.rings[*]") AS a FROM test`,
  597. data: &xsql.Tuple{
  598. Emitter: "test",
  599. Message: xsql.Message{
  600. "class": "warrior",
  601. "equipment": map[string]interface{}{
  602. "rings": []float64{
  603. 0.1, 2.4,
  604. },
  605. "arm_right": "Sword of flame",
  606. "arm_left": "Shield of faith",
  607. },
  608. },
  609. },
  610. result: []map[string]interface{}{{
  611. "a": []interface{}{
  612. 0.1, 2.4,
  613. },
  614. }},
  615. }, {
  616. sql: `SELECT json_path_query(equipment, "$.rings") AS a FROM test`,
  617. data: &xsql.Tuple{
  618. Emitter: "test",
  619. Message: xsql.Message{
  620. "class": "warrior",
  621. "equipment": map[string]interface{}{
  622. "rings": []float64{
  623. 0.1, 2.4,
  624. },
  625. "arm_right": "Sword of flame",
  626. "arm_left": "Shield of faith",
  627. },
  628. },
  629. },
  630. result: []map[string]interface{}{{
  631. "a": []interface{}{
  632. 0.1, 2.4,
  633. },
  634. }},
  635. }, {
  636. sql: `SELECT json_path_query(equipment, "$[0].rings[1]") AS a FROM test`,
  637. data: &xsql.Tuple{
  638. Emitter: "test",
  639. Message: xsql.Message{
  640. "class": "warrior",
  641. "equipment": []map[string]interface{}{
  642. {
  643. "rings": []float64{
  644. 0.1, 2.4,
  645. },
  646. "arm_right": "Sword of flame",
  647. "arm_left": "Shield of faith",
  648. },
  649. },
  650. },
  651. },
  652. result: []map[string]interface{}{{
  653. "a": 2.4,
  654. }},
  655. }, {
  656. sql: "SELECT json_path_query(equipment, \"$[0][\\\"arm.left\\\"]\") AS a FROM test",
  657. data: &xsql.Tuple{
  658. Emitter: "test",
  659. Message: xsql.Message{
  660. "class": "warrior",
  661. "equipment": []map[string]interface{}{
  662. {
  663. "rings": []float64{
  664. 0.1, 2.4,
  665. },
  666. "arm.right": "Sword of flame",
  667. "arm.left": "Shield of faith",
  668. },
  669. },
  670. },
  671. },
  672. result: []map[string]interface{}{{
  673. "a": "Shield of faith",
  674. }},
  675. },
  676. }
  677. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  678. contextLogger := common.Log.WithField("rule", "TestJsonFunc_Apply1")
  679. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  680. for i, tt := range tests {
  681. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  682. if err != nil || stmt == nil {
  683. t.Errorf("parse sql %s error %v", tt.sql, err)
  684. }
  685. pp := &ProjectPlan{Fields: stmt.Fields}
  686. pp.isTest = true
  687. fv, afv := xsql.NewFunctionValuersForOp(nil)
  688. result := pp.Apply(ctx, tt.data, fv, afv)
  689. var mapRes []map[string]interface{}
  690. if v, ok := result.([]byte); ok {
  691. err := json.Unmarshal(v, &mapRes)
  692. if err != nil {
  693. t.Errorf("Failed to parse the input into map.\n")
  694. continue
  695. }
  696. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  697. if !reflect.DeepEqual(tt.result, mapRes) {
  698. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  699. }
  700. } else {
  701. t.Errorf("The returned result is not type of []byte but found %v", result)
  702. }
  703. }
  704. }