misc_func_test.go 21 KB


  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo/context"
  20. "github.com/lf-edge/ekuiper/internal/xsql"
  21. "reflect"
  22. "strings"
  23. "testing"
  24. )
  25. func TestMiscFunc_Apply1(t *testing.T) {
  26. var tests = []struct {
  27. sql string
  28. data *xsql.Tuple
  29. result []map[string]interface{}
  30. }{
  31. {
  32. sql: "SELECT md5(a) AS a FROM test",
  33. data: &xsql.Tuple{
  34. Emitter: "test",
  35. Message: xsql.Message{
  36. "a": "The quick brown fox jumps over the lazy dog",
  37. "b": "myb",
  38. "c": "myc",
  39. },
  40. },
  41. result: []map[string]interface{}{{
  42. "a": strings.ToLower("9E107D9D372BB6826BD81D3542A419D6"),
  43. }},
  44. },
  45. {
  46. sql: "SELECT sha1(a) AS a FROM test",
  47. data: &xsql.Tuple{
  48. Emitter: "test",
  49. Message: xsql.Message{
  50. "a": "The quick brown fox jumps over the lazy dog",
  51. "b": "myb",
  52. "c": "myc",
  53. },
  54. },
  55. result: []map[string]interface{}{{
  56. "a": strings.ToLower("2FD4E1C67A2D28FCED849EE1BB76E7391B93EB12"),
  57. }},
  58. },
  59. {
  60. sql: "SELECT sha256(a) AS a FROM test",
  61. data: &xsql.Tuple{
  62. Emitter: "test",
  63. Message: xsql.Message{
  64. "a": "The quick brown fox jumps over the lazy dog",
  65. "b": "myb",
  66. "c": "myc",
  67. },
  68. },
  69. result: []map[string]interface{}{{
  70. "a": strings.ToLower("D7A8FBB307D7809469CA9ABCB0082E4F8D5651E46D3CDB762D02D0BF37C9E592"),
  71. }},
  72. },
  73. {
  74. sql: "SELECT sha384(a) AS a FROM test",
  75. data: &xsql.Tuple{
  76. Emitter: "test",
  77. Message: xsql.Message{
  78. "a": "The quick brown fox jumps over the lazy dog",
  79. "b": "myb",
  80. "c": "myc",
  81. },
  82. },
  83. result: []map[string]interface{}{{
  84. "a": strings.ToLower("CA737F1014A48F4C0B6DD43CB177B0AFD9E5169367544C494011E3317DBF9A509CB1E5DC1E85A941BBEE3D7F2AFBC9B1"),
  85. }},
  86. },
  87. {
  88. sql: "SELECT sha512(a) AS a FROM test",
  89. data: &xsql.Tuple{
  90. Emitter: "test",
  91. Message: xsql.Message{
  92. "a": "The quick brown fox jumps over the lazy dog",
  93. "b": "myb",
  94. "c": "myc",
  95. },
  96. },
  97. result: []map[string]interface{}{{
  98. "a": strings.ToLower("07E547D9586F6A73F73FBAC0435ED76951218FB7D0C8D788A309D785436BBB642E93A252A954F23912547D1E8A3B5ED6E1BFD7097821233FA0538F3DB854FEE6"),
  99. }},
  100. },
  101. {
  102. sql: "SELECT mqtt(topic) AS a FROM test",
  103. data: &xsql.Tuple{
  104. Emitter: "test",
  105. Message: xsql.Message{},
  106. Metadata: xsql.Metadata{
  107. "topic": "devices/device_001/message",
  108. },
  109. },
  110. result: []map[string]interface{}{{
  111. "a": "devices/device_001/message",
  112. }},
  113. },
  114. {
  115. sql: "SELECT mqtt(topic) AS a FROM test",
  116. data: &xsql.Tuple{
  117. Emitter: "test",
  118. Message: xsql.Message{},
  119. Metadata: xsql.Metadata{
  120. "topic": "devices/device_001/message",
  121. },
  122. },
  123. result: []map[string]interface{}{{
  124. "a": "devices/device_001/message",
  125. }},
  126. },
  127. {
  128. sql: "SELECT topic, mqtt(topic) AS a FROM test",
  129. data: &xsql.Tuple{
  130. Emitter: "test",
  131. Message: xsql.Message{
  132. "topic": "fff",
  133. },
  134. Metadata: xsql.Metadata{
  135. "topic": "devices/device_001/message",
  136. },
  137. },
  138. result: []map[string]interface{}{{
  139. "topic": "fff",
  140. "a": "devices/device_001/message",
  141. }},
  142. },
  143. {
  144. sql: "SELECT cardinality(arr) as r FROM test",
  145. data: &xsql.Tuple{
  146. Emitter: "test",
  147. Message: xsql.Message{
  148. "temperature": 43.2,
  149. "arr": []int{},
  150. },
  151. },
  152. result: []map[string]interface{}{{
  153. "r": float64(0),
  154. }},
  155. },
  156. {
  157. sql: "SELECT cardinality(arr) as r FROM test",
  158. data: &xsql.Tuple{
  159. Emitter: "test",
  160. Message: xsql.Message{
  161. "temperature": 43.2,
  162. "arr": []int{1, 2, 3, 4, 5},
  163. },
  164. },
  165. result: []map[string]interface{}{{
  166. "r": float64(5),
  167. }},
  168. },
  169. {
  170. sql: "SELECT isNull(arr) as r FROM test",
  171. data: &xsql.Tuple{
  172. Emitter: "test",
  173. Message: xsql.Message{
  174. "temperature": 43.2,
  175. "arr": []int{},
  176. },
  177. },
  178. result: []map[string]interface{}{{
  179. "r": false,
  180. }},
  181. },
  182. {
  183. sql: "SELECT isNull(arr) as r FROM test",
  184. data: &xsql.Tuple{
  185. Emitter: "test",
  186. Message: xsql.Message{
  187. "temperature": 43.2,
  188. "arr": []float64(nil),
  189. },
  190. },
  191. result: []map[string]interface{}{{
  192. "r": true,
  193. }},
  194. },
  195. {
  196. sql: "SELECT isNull(rec) as r FROM test",
  197. data: &xsql.Tuple{
  198. Emitter: "test",
  199. Message: xsql.Message{
  200. "temperature": 43.2,
  201. "rec": map[string]interface{}(nil),
  202. },
  203. },
  204. result: []map[string]interface{}{{
  205. "r": true,
  206. }},
  207. },
  208. {
  209. sql: "SELECT cast(a * 1000, \"datetime\") AS a FROM test",
  210. data: &xsql.Tuple{
  211. Emitter: "test",
  212. Message: xsql.Message{
  213. "a": 1.62000273e+09,
  214. "b": "ya",
  215. "c": "myc",
  216. },
  217. },
  218. result: []map[string]interface{}{{
  219. "a": "2021-05-03T00:45:30Z",
  220. }},
  221. },
  222. }
  223. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  224. contextLogger := conf.Log.WithField("rule", "TestMiscFunc_Apply1")
  225. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  226. for i, tt := range tests {
  227. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  228. if err != nil || stmt == nil {
  229. t.Errorf("parse sql %s error %v", tt.sql, err)
  230. }
  231. pp := &ProjectOp{Fields: stmt.Fields}
  232. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  233. result := pp.Apply(ctx, tt.data, fv, afv)
  234. var mapRes []map[string]interface{}
  235. if v, ok := result.([]byte); ok {
  236. err := json.Unmarshal(v, &mapRes)
  237. if err != nil {
  238. t.Errorf("Failed to parse the input into map.\n")
  239. continue
  240. }
  241. if !reflect.DeepEqual(tt.result, mapRes) {
  242. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  243. }
  244. } else {
  245. t.Errorf("The returned result is not type of []byte\n")
  246. }
  247. }
  248. }
  249. func TestMqttFunc_Apply2(t *testing.T) {
  250. var tests = []struct {
  251. sql string
  252. data *xsql.JoinTupleSets
  253. result []map[string]interface{}
  254. }{
  255. {
  256. sql: "SELECT id1, mqtt(src1.topic) AS a, mqtt(src2.topic) as b FROM src1 LEFT JOIN src2 ON src1.id1 = src2.id1",
  257. data: &xsql.JoinTupleSets{
  258. Content: []xsql.JoinTuple{
  259. {
  260. Tuples: []xsql.Tuple{
  261. {Emitter: "src1", Message: xsql.Message{"id1": "1", "f1": "v1"}, Metadata: xsql.Metadata{"topic": "devices/type1/device001"}},
  262. {Emitter: "src2", Message: xsql.Message{"id2": "1", "f2": "w1"}, Metadata: xsql.Metadata{"topic": "devices/type2/device001"}},
  263. },
  264. },
  265. },
  266. },
  267. result: []map[string]interface{}{{
  268. "id1": "1",
  269. "a": "devices/type1/device001",
  270. "b": "devices/type2/device001",
  271. }},
  272. },
  273. }
  274. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  275. contextLogger := conf.Log.WithField("rule", "TestMqttFunc_Apply2")
  276. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  277. for i, tt := range tests {
  278. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  279. if err != nil || stmt == nil {
  280. t.Errorf("parse sql %s error %v", tt.sql, err)
  281. }
  282. pp := &ProjectOp{Fields: stmt.Fields}
  283. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  284. result := pp.Apply(ctx, tt.data, fv, afv)
  285. var mapRes []map[string]interface{}
  286. if v, ok := result.([]byte); ok {
  287. err := json.Unmarshal(v, &mapRes)
  288. if err != nil {
  289. t.Errorf("Failed to parse the input into map.\n")
  290. continue
  291. }
  292. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  293. if !reflect.DeepEqual(tt.result, mapRes) {
  294. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  295. }
  296. } else {
  297. t.Errorf("The returned result is not type of []byte\n")
  298. }
  299. }
  300. }
  301. func TestMetaFunc_Apply1(t *testing.T) {
  302. var tests = []struct {
  303. sql string
  304. data interface{}
  305. result interface{}
  306. }{
  307. {
  308. sql: "SELECT topic, meta(topic) AS a FROM test",
  309. data: &xsql.Tuple{
  310. Emitter: "test",
  311. Message: xsql.Message{
  312. "topic": "fff",
  313. },
  314. Metadata: xsql.Metadata{
  315. "topic": "devices/device_001/message",
  316. },
  317. },
  318. result: []map[string]interface{}{{
  319. "topic": "fff",
  320. "a": "devices/device_001/message",
  321. }},
  322. },
  323. {
  324. sql: "SELECT meta(device) as d, meta(temperature->device) as r FROM test",
  325. data: &xsql.Tuple{
  326. Emitter: "test",
  327. Message: xsql.Message{
  328. "temperature": 43.2,
  329. },
  330. Metadata: xsql.Metadata{
  331. "temperature": map[string]interface{}{
  332. "id": "dfadfasfas",
  333. "device": "device2",
  334. },
  335. "device": "gateway",
  336. },
  337. },
  338. result: []map[string]interface{}{{
  339. "d": "gateway",
  340. "r": "device2",
  341. }},
  342. },
  343. {
  344. sql: "SELECT meta(*) as r FROM test",
  345. data: &xsql.Tuple{
  346. Emitter: "test",
  347. Message: xsql.Message{
  348. "temperature": 43.2,
  349. },
  350. Metadata: xsql.Metadata{
  351. "temperature": map[string]interface{}{
  352. "id": "dfadfasfas",
  353. "device": "device2",
  354. },
  355. "device": "gateway",
  356. },
  357. },
  358. result: []map[string]interface{}{{
  359. "r": map[string]interface{}{
  360. "temperature": map[string]interface{}{
  361. "id": "dfadfasfas",
  362. "device": "device2",
  363. },
  364. "device": "gateway",
  365. },
  366. }},
  367. },
  368. {
  369. sql: "SELECT topic, meta(`Light-diming`->device) AS a FROM test",
  370. data: &xsql.Tuple{
  371. Emitter: "test",
  372. Message: xsql.Message{
  373. "topic": "fff",
  374. },
  375. Metadata: xsql.Metadata{
  376. "Light-diming": map[string]interface{}{
  377. "device": "device2",
  378. },
  379. },
  380. },
  381. result: []map[string]interface{}{{
  382. "topic": "fff",
  383. "a": "device2",
  384. }},
  385. },
  386. }
  387. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  388. contextLogger := conf.Log.WithField("rule", "TestMetaFunc_Apply1")
  389. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  390. for i, tt := range tests {
  391. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  392. if err != nil || stmt == nil {
  393. t.Errorf("parse sql %s error %v", tt.sql, err)
  394. }
  395. pp := &ProjectOp{Fields: stmt.Fields}
  396. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  397. result := pp.Apply(ctx, tt.data, fv, afv)
  398. var mapRes []map[string]interface{}
  399. if v, ok := result.([]byte); ok {
  400. err := json.Unmarshal(v, &mapRes)
  401. if err != nil {
  402. t.Errorf("Failed to parse the input into map.\n")
  403. continue
  404. }
  405. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  406. if !reflect.DeepEqual(tt.result, mapRes) {
  407. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  408. }
  409. } else {
  410. t.Errorf("The returned result is not type of []byte\n")
  411. }
  412. }
  413. }
  414. func TestJsonPathFunc_Apply1(t *testing.T) {
  415. var tests = []struct {
  416. sql string
  417. data interface{}
  418. result interface{}
  419. }{
  420. {
  421. sql: `SELECT json_path_query(equipment, "$.arm_right") AS a FROM test`,
  422. data: &xsql.Tuple{
  423. Emitter: "test",
  424. Message: xsql.Message{
  425. "class": "warrior",
  426. "equipment": map[string]interface{}{
  427. "rings": []map[string]interface{}{
  428. {
  429. "name": "ring of despair",
  430. "weight": 0.1,
  431. }, {
  432. "name": "ring of strength",
  433. "weight": 2.4,
  434. },
  435. },
  436. "arm_right": "Sword of flame",
  437. "arm_left": "Shield of faith",
  438. },
  439. },
  440. },
  441. result: []map[string]interface{}{{
  442. "a": "Sword of flame",
  443. }},
  444. }, {
  445. sql: `SELECT json_path_query(equipment, "$.rings[*].weight") AS a FROM test`,
  446. data: &xsql.Tuple{
  447. Emitter: "test",
  448. Message: xsql.Message{
  449. "class": "warrior",
  450. "equipment": map[string]interface{}{
  451. "rings": []interface{}{
  452. map[string]interface{}{
  453. "name": "ring of despair",
  454. "weight": 0.1,
  455. }, map[string]interface{}{
  456. "name": "ring of strength",
  457. "weight": 2.4,
  458. },
  459. },
  460. "arm_right": "Sword of flame",
  461. "arm_left": "Shield of faith",
  462. },
  463. },
  464. },
  465. result: []map[string]interface{}{{
  466. "a": []interface{}{
  467. 0.1, 2.4,
  468. },
  469. }},
  470. }, {
  471. sql: `SELECT json_path_query_first(equipment, "$.rings[*].weight") AS a FROM test`,
  472. data: &xsql.Tuple{
  473. Emitter: "test",
  474. Message: xsql.Message{
  475. "class": "warrior",
  476. "equipment": map[string]interface{}{
  477. "rings": []interface{}{
  478. map[string]interface{}{
  479. "name": "ring of despair",
  480. "weight": 0.1,
  481. }, map[string]interface{}{
  482. "name": "ring of strength",
  483. "weight": 2.4,
  484. },
  485. },
  486. "arm_right": "Sword of flame",
  487. "arm_left": "Shield of faith",
  488. },
  489. },
  490. },
  491. result: []map[string]interface{}{{
  492. "a": 0.1,
  493. }},
  494. }, {
  495. sql: `SELECT json_path_query(equipment, "$.rings[? @.weight>1]") AS a FROM test`,
  496. data: &xsql.Tuple{
  497. Emitter: "test",
  498. Message: xsql.Message{
  499. "class": "warrior",
  500. "equipment": map[string]interface{}{
  501. "rings": []interface{}{
  502. map[string]interface{}{
  503. "name": "ring of despair",
  504. "weight": 0.1,
  505. }, map[string]interface{}{
  506. "name": "ring of strength",
  507. "weight": 2.4,
  508. },
  509. },
  510. "arm_right": "Sword of flame",
  511. "arm_left": "Shield of faith",
  512. },
  513. },
  514. },
  515. result: []map[string]interface{}{{
  516. "a": []interface{}{
  517. map[string]interface{}{
  518. "name": "ring of strength",
  519. "weight": 2.4,
  520. },
  521. },
  522. }},
  523. }, {
  524. sql: `SELECT json_path_query(equipment, "$.rings[? @.weight>1].name") AS a FROM test`,
  525. data: &xsql.Tuple{
  526. Emitter: "test",
  527. Message: xsql.Message{
  528. "class": "warrior",
  529. "equipment": map[string]interface{}{
  530. "rings": []interface{}{
  531. map[string]interface{}{
  532. "name": "ring of despair",
  533. "weight": 0.1,
  534. }, map[string]interface{}{
  535. "name": "ring of strength",
  536. "weight": 2.4,
  537. },
  538. },
  539. "arm_right": "Sword of flame",
  540. "arm_left": "Shield of faith",
  541. },
  542. },
  543. },
  544. result: []map[string]interface{}{{
  545. "a": []interface{}{
  546. "ring of strength",
  547. },
  548. }},
  549. }, {
  550. sql: `SELECT json_path_exists(equipment, "$.rings[? @.weight>5]") AS a FROM test`,
  551. data: &xsql.Tuple{
  552. Emitter: "test",
  553. Message: xsql.Message{
  554. "class": "warrior",
  555. "equipment": map[string]interface{}{
  556. "rings": []interface{}{
  557. map[string]interface{}{
  558. "name": "ring of despair",
  559. "weight": 0.1,
  560. }, map[string]interface{}{
  561. "name": "ring of strength",
  562. "weight": 2.4,
  563. },
  564. },
  565. "arm_right": "Sword of flame",
  566. "arm_left": "Shield of faith",
  567. },
  568. },
  569. },
  570. result: []map[string]interface{}{{
  571. "a": false,
  572. }},
  573. }, {
  574. sql: `SELECT json_path_exists(equipment, "$.ring1") AS a FROM test`,
  575. data: &xsql.Tuple{
  576. Emitter: "test",
  577. Message: xsql.Message{
  578. "class": "warrior",
  579. "equipment": map[string]interface{}{
  580. "rings": []interface{}{
  581. map[string]interface{}{
  582. "name": "ring of despair",
  583. "weight": 0.1,
  584. }, map[string]interface{}{
  585. "name": "ring of strength",
  586. "weight": 2.4,
  587. },
  588. },
  589. "arm_right": "Sword of flame",
  590. "arm_left": "Shield of faith",
  591. },
  592. },
  593. },
  594. result: []map[string]interface{}{{
  595. "a": false,
  596. }},
  597. }, {
  598. sql: `SELECT json_path_exists(equipment, "$.rings") AS a FROM test`,
  599. data: &xsql.Tuple{
  600. Emitter: "test",
  601. Message: xsql.Message{
  602. "class": "warrior",
  603. "equipment": map[string]interface{}{
  604. "rings": []interface{}{
  605. map[string]interface{}{
  606. "name": "ring of despair",
  607. "weight": 0.1,
  608. }, map[string]interface{}{
  609. "name": "ring of strength",
  610. "weight": 2.4,
  611. },
  612. },
  613. "arm_right": "Sword of flame",
  614. "arm_left": "Shield of faith",
  615. },
  616. },
  617. },
  618. result: []map[string]interface{}{{
  619. "a": true,
  620. }},
  621. }, {
  622. sql: `SELECT json_path_query(equipment, "$.rings[? (@.weight>1)].name") AS a FROM test`,
  623. data: &xsql.Tuple{
  624. Emitter: "test",
  625. Message: xsql.Message{
  626. "class": "warrior",
  627. "equipment": map[string]interface{}{
  628. "rings": []map[string]interface{}{
  629. {
  630. "name": "ring of despair",
  631. "weight": 0.1,
  632. }, {
  633. "name": "ring of strength",
  634. "weight": 2.4,
  635. },
  636. },
  637. "arm_right": "Sword of flame",
  638. "arm_left": "Shield of faith",
  639. },
  640. },
  641. },
  642. result: []map[string]interface{}{{
  643. "a": []interface{}{
  644. "ring of strength",
  645. },
  646. }},
  647. }, {
  648. sql: `SELECT json_path_query(equipment, "$.rings[*]") AS a FROM test`,
  649. data: &xsql.Tuple{
  650. Emitter: "test",
  651. Message: xsql.Message{
  652. "class": "warrior",
  653. "equipment": map[string]interface{}{
  654. "rings": []float64{
  655. 0.1, 2.4,
  656. },
  657. "arm_right": "Sword of flame",
  658. "arm_left": "Shield of faith",
  659. },
  660. },
  661. },
  662. result: []map[string]interface{}{{
  663. "a": []interface{}{
  664. 0.1, 2.4,
  665. },
  666. }},
  667. }, {
  668. sql: `SELECT json_path_query(equipment, "$.rings") AS a FROM test`,
  669. data: &xsql.Tuple{
  670. Emitter: "test",
  671. Message: xsql.Message{
  672. "class": "warrior",
  673. "equipment": map[string]interface{}{
  674. "rings": []float64{
  675. 0.1, 2.4,
  676. },
  677. "arm_right": "Sword of flame",
  678. "arm_left": "Shield of faith",
  679. },
  680. },
  681. },
  682. result: []map[string]interface{}{{
  683. "a": []interface{}{
  684. 0.1, 2.4,
  685. },
  686. }},
  687. }, {
  688. sql: `SELECT json_path_query(equipment, "$[0].rings[1]") AS a FROM test`,
  689. data: &xsql.Tuple{
  690. Emitter: "test",
  691. Message: xsql.Message{
  692. "class": "warrior",
  693. "equipment": []map[string]interface{}{
  694. {
  695. "rings": []float64{
  696. 0.1, 2.4,
  697. },
  698. "arm_right": "Sword of flame",
  699. "arm_left": "Shield of faith",
  700. },
  701. },
  702. },
  703. },
  704. result: []map[string]interface{}{{
  705. "a": 2.4,
  706. }},
  707. }, {
  708. sql: "SELECT json_path_query(equipment, \"$[0][\\\"arm.left\\\"]\") AS a FROM test",
  709. data: &xsql.Tuple{
  710. Emitter: "test",
  711. Message: xsql.Message{
  712. "class": "warrior",
  713. "equipment": []map[string]interface{}{
  714. {
  715. "rings": []float64{
  716. 0.1, 2.4,
  717. },
  718. "arm.right": "Sword of flame",
  719. "arm.left": "Shield of faith",
  720. },
  721. },
  722. },
  723. },
  724. result: []map[string]interface{}{{
  725. "a": "Shield of faith",
  726. }},
  727. }, {
  728. sql: "SELECT json_path_query(equipment, \"$[\\\"arm.left\\\"]\") AS a FROM test",
  729. data: &xsql.Tuple{
  730. Emitter: "test",
  731. Message: xsql.Message{
  732. "class": "warrior",
  733. "equipment": `{"rings": [0.1, 2.4],"arm.right": "Sword of flame","arm.left": "Shield of faith"}`,
  734. },
  735. },
  736. result: []map[string]interface{}{{
  737. "a": "Shield of faith",
  738. }},
  739. }, {
  740. sql: "SELECT json_path_query(equipment, \"$[0][\\\"arm.left\\\"]\") AS a FROM test",
  741. data: &xsql.Tuple{
  742. Emitter: "test",
  743. Message: xsql.Message{
  744. "class": "warrior",
  745. "equipment": `[{"rings": [0.1, 2.4],"arm.right": "Sword of flame","arm.left": "Shield of faith"}]`,
  746. },
  747. },
  748. result: []map[string]interface{}{{
  749. "a": "Shield of faith",
  750. }},
  751. }, {
  752. sql: `SELECT all[poi[-1] + 1]->ts as powerOnTs FROM test`,
  753. data: &xsql.Tuple{
  754. Emitter: "test",
  755. Message: xsql.Message{
  756. "all": []map[string]interface{}{
  757. {"SystemPowerMode": 0, "VehicleSpeed": 0, "FLWdwPosition": 0, "FrontWiperSwitchStatus": float64(1), "ts": 0},
  758. {"SystemPowerMode": 0, "VehicleSpeed": 0, "FLWdwPosition": 0, "FrontWiperSwitchStatus": float64(4), "ts": 500},
  759. {"SystemPowerMode": 2, "VehicleSpeed": 0, "FLWdwPosition": 0, "FrontWiperSwitchStatus": 0, "ts": 1000},
  760. {"SystemPowerMode": 2, "VehicleSpeed": 10, "FLWdwPosition": 20, "FrontWiperSwitchStatus": 0, "ts": 60000},
  761. {"SystemPowerMode": 2, "VehicleSpeed": 10, "FLWdwPosition": 20, "FrontWiperSwitchStatus": 0, "ts": 89500},
  762. {"SystemPowerMode": 2, "VehicleSpeed": 20, "FLWdwPosition": 50, "FrontWiperSwitchStatus": 5, "ts": 90000},
  763. {"SystemPowerMode": 2, "VehicleSpeed": 40, "FLWdwPosition": 60, "FrontWiperSwitchStatus": 5, "ts": 121000},
  764. },
  765. "poi": []interface{}{0, 1},
  766. },
  767. },
  768. result: []map[string]interface{}{{
  769. "powerOnTs": float64(1000),
  770. }},
  771. },
  772. }
  773. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  774. contextLogger := conf.Log.WithField("rule", "TestJsonFunc_Apply1")
  775. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  776. for i, tt := range tests {
  777. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  778. if err != nil || stmt == nil {
  779. t.Errorf("parse sql %s error %v", tt.sql, err)
  780. }
  781. pp := &ProjectOp{Fields: stmt.Fields}
  782. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  783. result := pp.Apply(ctx, tt.data, fv, afv)
  784. var mapRes []map[string]interface{}
  785. if v, ok := result.([]byte); ok {
  786. err := json.Unmarshal(v, &mapRes)
  787. if err != nil {
  788. t.Errorf("Failed to parse the input into map.\n")
  789. continue
  790. }
  791. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  792. if !reflect.DeepEqual(tt.result, mapRes) {
  793. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  794. }
  795. } else {
  796. t.Errorf("The returned result is not type of []byte but found %v", result)
  797. }
  798. }
  799. }