misc_func_test.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/conf"
  18. "github.com/lf-edge/ekuiper/internal/testx"
  19. "github.com/lf-edge/ekuiper/internal/topo/context"
  20. "github.com/lf-edge/ekuiper/internal/topo/state"
  21. "github.com/lf-edge/ekuiper/internal/xsql"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "github.com/lf-edge/ekuiper/pkg/cast"
  24. "reflect"
  25. "strconv"
  26. "strings"
  27. "testing"
  28. )
  29. func TestMiscFunc_Apply1(t *testing.T) {
  30. var tests = []struct {
  31. sql string
  32. data *xsql.Tuple
  33. result []map[string]interface{}
  34. }{
  35. {
  36. sql: "SELECT md5(a) AS a FROM test",
  37. data: &xsql.Tuple{
  38. Emitter: "test",
  39. Message: xsql.Message{
  40. "a": "The quick brown fox jumps over the lazy dog",
  41. "b": "myb",
  42. "c": "myc",
  43. },
  44. },
  45. result: []map[string]interface{}{{
  46. "a": strings.ToLower("9E107D9D372BB6826BD81D3542A419D6"),
  47. }},
  48. },
  49. {
  50. sql: "SELECT md5(d) AS a FROM test",
  51. data: &xsql.Tuple{
  52. Emitter: "test",
  53. Message: xsql.Message{
  54. "a": "The quick brown fox jumps over the lazy dog",
  55. "b": "myb",
  56. "c": "myc",
  57. },
  58. },
  59. result: []map[string]interface{}{{}},
  60. },
  61. {
  62. sql: "SELECT sha1(a) AS a FROM test",
  63. data: &xsql.Tuple{
  64. Emitter: "test",
  65. Message: xsql.Message{
  66. "a": "The quick brown fox jumps over the lazy dog",
  67. "b": "myb",
  68. "c": "myc",
  69. },
  70. },
  71. result: []map[string]interface{}{{
  72. "a": strings.ToLower("2FD4E1C67A2D28FCED849EE1BB76E7391B93EB12"),
  73. }},
  74. },
  75. {
  76. sql: "SELECT sha256(a) AS a FROM test",
  77. data: &xsql.Tuple{
  78. Emitter: "test",
  79. Message: xsql.Message{
  80. "a": "The quick brown fox jumps over the lazy dog",
  81. "b": "myb",
  82. "c": "myc",
  83. },
  84. },
  85. result: []map[string]interface{}{{
  86. "a": strings.ToLower("D7A8FBB307D7809469CA9ABCB0082E4F8D5651E46D3CDB762D02D0BF37C9E592"),
  87. }},
  88. },
  89. {
  90. sql: "SELECT sha384(a) AS a FROM test",
  91. data: &xsql.Tuple{
  92. Emitter: "test",
  93. Message: xsql.Message{
  94. "a": "The quick brown fox jumps over the lazy dog",
  95. "b": "myb",
  96. "c": "myc",
  97. },
  98. },
  99. result: []map[string]interface{}{{
  100. "a": strings.ToLower("CA737F1014A48F4C0B6DD43CB177B0AFD9E5169367544C494011E3317DBF9A509CB1E5DC1E85A941BBEE3D7F2AFBC9B1"),
  101. }},
  102. },
  103. {
  104. sql: "SELECT sha512(a) AS a FROM test",
  105. data: &xsql.Tuple{
  106. Emitter: "test",
  107. Message: xsql.Message{
  108. "a": "The quick brown fox jumps over the lazy dog",
  109. "b": "myb",
  110. "c": "myc",
  111. },
  112. },
  113. result: []map[string]interface{}{{
  114. "a": strings.ToLower("07E547D9586F6A73F73FBAC0435ED76951218FB7D0C8D788A309D785436BBB642E93A252A954F23912547D1E8A3B5ED6E1BFD7097821233FA0538F3DB854FEE6"),
  115. }},
  116. },
  117. {
  118. sql: "SELECT mqtt(topic) AS a FROM test",
  119. data: &xsql.Tuple{
  120. Emitter: "test",
  121. Message: xsql.Message{},
  122. Metadata: xsql.Metadata{
  123. "topic": "devices/device_001/message",
  124. },
  125. },
  126. result: []map[string]interface{}{{
  127. "a": "devices/device_001/message",
  128. }},
  129. },
  130. {
  131. sql: "SELECT mqtt(topic) AS a FROM test",
  132. data: &xsql.Tuple{
  133. Emitter: "test",
  134. Message: xsql.Message{},
  135. Metadata: xsql.Metadata{
  136. "topic": "devices/device_001/message",
  137. },
  138. },
  139. result: []map[string]interface{}{{
  140. "a": "devices/device_001/message",
  141. }},
  142. },
  143. {
  144. sql: "SELECT topic, mqtt(topic) AS a FROM test",
  145. data: &xsql.Tuple{
  146. Emitter: "test",
  147. Message: xsql.Message{
  148. "topic": "fff",
  149. },
  150. Metadata: xsql.Metadata{
  151. "topic": "devices/device_001/message",
  152. },
  153. },
  154. result: []map[string]interface{}{{
  155. "topic": "fff",
  156. "a": "devices/device_001/message",
  157. }},
  158. },
  159. {
  160. sql: "SELECT cardinality(arr) as r FROM test",
  161. data: &xsql.Tuple{
  162. Emitter: "test",
  163. Message: xsql.Message{
  164. "temperature": 43.2,
  165. "arr": []int{},
  166. },
  167. },
  168. result: []map[string]interface{}{{
  169. "r": 0,
  170. }},
  171. },
  172. {
  173. sql: "SELECT cardinality(arr) as r FROM test",
  174. data: &xsql.Tuple{
  175. Emitter: "test",
  176. Message: xsql.Message{
  177. "temperature": 43.2,
  178. "arr": []int{1, 2, 3, 4, 5},
  179. },
  180. },
  181. result: []map[string]interface{}{{
  182. "r": 5,
  183. }},
  184. },
  185. {
  186. sql: "SELECT isNull(arr) as r FROM test",
  187. data: &xsql.Tuple{
  188. Emitter: "test",
  189. Message: xsql.Message{
  190. "temperature": 43.2,
  191. "arr": []int{},
  192. },
  193. },
  194. result: []map[string]interface{}{{
  195. "r": false,
  196. }},
  197. },
  198. {
  199. sql: "SELECT isNull(arr) as r FROM test",
  200. data: &xsql.Tuple{
  201. Emitter: "test",
  202. Message: xsql.Message{
  203. "temperature": 43.2,
  204. "arr": []float64(nil),
  205. },
  206. },
  207. result: []map[string]interface{}{{
  208. "r": true,
  209. }},
  210. },
  211. {
  212. sql: "SELECT isNull(rec) as r FROM test",
  213. data: &xsql.Tuple{
  214. Emitter: "test",
  215. Message: xsql.Message{
  216. "temperature": 43.2,
  217. "rec": map[string]interface{}(nil),
  218. },
  219. },
  220. result: []map[string]interface{}{{
  221. "r": true,
  222. }},
  223. },
  224. {
  225. sql: "SELECT cast(a * 1000, \"datetime\") AS a FROM test",
  226. data: &xsql.Tuple{
  227. Emitter: "test",
  228. Message: xsql.Message{
  229. "a": 1.62000273e+09,
  230. "b": "ya",
  231. "c": "myc",
  232. },
  233. },
  234. result: []map[string]interface{}{{
  235. "a": cast.TimeFromUnixMilli(1.62000273e+12),
  236. }},
  237. },
  238. }
  239. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  240. contextLogger := conf.Log.WithField("rule", "TestMiscFunc_Apply1")
  241. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  242. for i, tt := range tests {
  243. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  244. if err != nil || stmt == nil {
  245. t.Errorf("parse sql %s error %v", tt.sql, err)
  246. }
  247. pp := &ProjectOp{}
  248. parseStmt(pp, stmt.Fields)
  249. fv, afv := xsql.NewFunctionValuersForOp(nil)
  250. opResult := pp.Apply(ctx, tt.data, fv, afv)
  251. result, err := parseResult(opResult, pp.IsAggregate)
  252. if err != nil {
  253. t.Errorf("parse result error: %s", err)
  254. continue
  255. }
  256. if !reflect.DeepEqual(tt.result, result) {
  257. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  258. }
  259. }
  260. }
  261. func TestMqttFunc_Apply2(t *testing.T) {
  262. var tests = []struct {
  263. sql string
  264. data *xsql.JoinTuples
  265. result []map[string]interface{}
  266. }{
  267. {
  268. sql: "SELECT id1, mqtt(src1.topic) AS a, mqtt(src2.topic) as b FROM src1 LEFT JOIN src2 ON src1.id1 = src2.id1",
  269. data: &xsql.JoinTuples{
  270. Content: []*xsql.JoinTuple{
  271. {
  272. Tuples: []xsql.TupleRow{
  273. &xsql.Tuple{Emitter: "src1", Message: xsql.Message{"id1": "1", "f1": "v1"}, Metadata: xsql.Metadata{"topic": "devices/type1/device001"}},
  274. &xsql.Tuple{Emitter: "src2", Message: xsql.Message{"id2": "1", "f2": "w1"}, Metadata: xsql.Metadata{"topic": "devices/type2/device001"}},
  275. },
  276. },
  277. },
  278. },
  279. result: []map[string]interface{}{{
  280. "id1": "1",
  281. "a": "devices/type1/device001",
  282. "b": "devices/type2/device001",
  283. }},
  284. },
  285. }
  286. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  287. contextLogger := conf.Log.WithField("rule", "TestMqttFunc_Apply2")
  288. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  289. for i, tt := range tests {
  290. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  291. if err != nil || stmt == nil {
  292. t.Errorf("parse sql %s error %v", tt.sql, err)
  293. }
  294. pp := &ProjectOp{}
  295. parseStmt(pp, stmt.Fields)
  296. fv, afv := xsql.NewFunctionValuersForOp(nil)
  297. opResult := pp.Apply(ctx, tt.data, fv, afv)
  298. result, err := parseResult(opResult, pp.IsAggregate)
  299. if err != nil {
  300. t.Errorf("parse result error: %s", err)
  301. continue
  302. }
  303. if !reflect.DeepEqual(tt.result, result) {
  304. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  305. }
  306. }
  307. }
  308. func TestMetaFunc_Apply1(t *testing.T) {
  309. var tests = []struct {
  310. sql string
  311. data interface{}
  312. result interface{}
  313. }{
  314. {
  315. sql: "SELECT topic, meta(topic) AS a FROM test",
  316. data: &xsql.Tuple{
  317. Emitter: "test",
  318. Message: xsql.Message{
  319. "topic": "fff",
  320. },
  321. Metadata: xsql.Metadata{
  322. "topic": "devices/device_001/message",
  323. },
  324. },
  325. result: []map[string]interface{}{{
  326. "topic": "fff",
  327. "a": "devices/device_001/message",
  328. }},
  329. },
  330. {
  331. sql: "SELECT meta(device) as d, meta(temperature->device) as r FROM test",
  332. data: &xsql.Tuple{
  333. Emitter: "test",
  334. Message: xsql.Message{
  335. "temperature": 43.2,
  336. },
  337. Metadata: xsql.Metadata{
  338. "temperature": map[string]interface{}{
  339. "id": "dfadfasfas",
  340. "device": "device2",
  341. },
  342. "device": "gateway",
  343. },
  344. },
  345. result: []map[string]interface{}{{
  346. "d": "gateway",
  347. "r": "device2",
  348. }},
  349. },
  350. {
  351. sql: "SELECT meta(*) as r FROM test",
  352. data: &xsql.Tuple{
  353. Emitter: "test",
  354. Message: xsql.Message{
  355. "temperature": 43.2,
  356. },
  357. Metadata: xsql.Metadata{
  358. "temperature": map[string]interface{}{
  359. "id": "dfadfasfas",
  360. "device": "device2",
  361. },
  362. "device": "gateway",
  363. },
  364. },
  365. result: []map[string]interface{}{{
  366. "r": map[string]interface{}{
  367. "temperature": map[string]interface{}{
  368. "id": "dfadfasfas",
  369. "device": "device2",
  370. },
  371. "device": "gateway",
  372. },
  373. }},
  374. },
  375. {
  376. sql: "SELECT topic, meta(`Light-diming`->device) AS a FROM test",
  377. data: &xsql.Tuple{
  378. Emitter: "test",
  379. Message: xsql.Message{
  380. "topic": "fff",
  381. },
  382. Metadata: xsql.Metadata{
  383. "Light-diming": map[string]interface{}{
  384. "device": "device2",
  385. },
  386. },
  387. },
  388. result: []map[string]interface{}{{
  389. "topic": "fff",
  390. "a": "device2",
  391. }},
  392. },
  393. }
  394. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  395. contextLogger := conf.Log.WithField("rule", "TestMetaFunc_Apply1")
  396. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  397. for i, tt := range tests {
  398. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  399. if err != nil || stmt == nil {
  400. t.Errorf("parse sql %s error %v", tt.sql, err)
  401. }
  402. pp := &ProjectOp{}
  403. parseStmt(pp, stmt.Fields)
  404. fv, afv := xsql.NewFunctionValuersForOp(nil)
  405. opResult := pp.Apply(ctx, tt.data, fv, afv)
  406. result, err := parseResult(opResult, pp.IsAggregate)
  407. if err != nil {
  408. t.Errorf("parse result error: %s", err)
  409. continue
  410. }
  411. if !reflect.DeepEqual(tt.result, result) {
  412. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  413. }
  414. }
  415. }
  416. func TestJsonPathFunc_Apply1(t *testing.T) {
  417. var tests = []struct {
  418. sql string
  419. data interface{}
  420. result interface{}
  421. err string
  422. }{
  423. {
  424. sql: `SELECT json_path_query(equipment, "$.arm_right") AS a FROM test`,
  425. data: &xsql.Tuple{
  426. Emitter: "test",
  427. Message: xsql.Message{
  428. "class": "warrior",
  429. "equipment": map[string]interface{}{
  430. "rings": []map[string]interface{}{
  431. {
  432. "name": "ring of despair",
  433. "weight": 0.1,
  434. }, {
  435. "name": "ring of strength",
  436. "weight": 2.4,
  437. },
  438. },
  439. "arm_right": "Sword of flame",
  440. "arm_left": "Shield of faith",
  441. },
  442. },
  443. },
  444. result: []map[string]interface{}{{
  445. "a": "Sword of flame",
  446. }},
  447. }, {
  448. sql: `SELECT json_path_query(equipment, "$.rings[*].weight") AS a FROM test`,
  449. data: &xsql.Tuple{
  450. Emitter: "test",
  451. Message: xsql.Message{
  452. "class": "warrior",
  453. "equipment": map[string]interface{}{
  454. "rings": []interface{}{
  455. map[string]interface{}{
  456. "name": "ring of despair",
  457. "weight": 0.1,
  458. }, map[string]interface{}{
  459. "name": "ring of strength",
  460. "weight": 2.4,
  461. },
  462. },
  463. "arm_right": "Sword of flame",
  464. "arm_left": "Shield of faith",
  465. },
  466. },
  467. },
  468. result: []map[string]interface{}{{
  469. "a": []interface{}{
  470. 0.1, 2.4,
  471. },
  472. }},
  473. }, {
  474. sql: `SELECT json_path_query_first(equipment, "$.rings[*].weight") AS a FROM test`,
  475. data: &xsql.Tuple{
  476. Emitter: "test",
  477. Message: xsql.Message{
  478. "class": "warrior",
  479. "equipment": map[string]interface{}{
  480. "rings": []interface{}{
  481. map[string]interface{}{
  482. "name": "ring of despair",
  483. "weight": 0.1,
  484. }, map[string]interface{}{
  485. "name": "ring of strength",
  486. "weight": 2.4,
  487. },
  488. },
  489. "arm_right": "Sword of flame",
  490. "arm_left": "Shield of faith",
  491. },
  492. },
  493. },
  494. result: []map[string]interface{}{{
  495. "a": 0.1,
  496. }},
  497. }, {
  498. sql: `SELECT json_path_query(equipment, "$.rings[? @.weight>1]") AS a FROM test`,
  499. data: &xsql.Tuple{
  500. Emitter: "test",
  501. Message: xsql.Message{
  502. "class": "warrior",
  503. "equipment": map[string]interface{}{
  504. "rings": []interface{}{
  505. map[string]interface{}{
  506. "name": "ring of despair",
  507. "weight": 0.1,
  508. }, map[string]interface{}{
  509. "name": "ring of strength",
  510. "weight": 2.4,
  511. },
  512. },
  513. "arm_right": "Sword of flame",
  514. "arm_left": "Shield of faith",
  515. },
  516. },
  517. },
  518. result: []map[string]interface{}{{
  519. "a": []interface{}{
  520. map[string]interface{}{
  521. "name": "ring of strength",
  522. "weight": 2.4,
  523. },
  524. },
  525. }},
  526. }, {
  527. sql: `SELECT json_path_query(equipment, "$.rings[? @.weight>1].name") AS a FROM test`,
  528. data: &xsql.Tuple{
  529. Emitter: "test",
  530. Message: xsql.Message{
  531. "class": "warrior",
  532. "equipment": map[string]interface{}{
  533. "rings": []interface{}{
  534. map[string]interface{}{
  535. "name": "ring of despair",
  536. "weight": 0.1,
  537. }, map[string]interface{}{
  538. "name": "ring of strength",
  539. "weight": 2.4,
  540. },
  541. },
  542. "arm_right": "Sword of flame",
  543. "arm_left": "Shield of faith",
  544. },
  545. },
  546. },
  547. result: []map[string]interface{}{{
  548. "a": []interface{}{
  549. "ring of strength",
  550. },
  551. }},
  552. }, {
  553. sql: `SELECT json_path_exists(equipment, "$.rings[? @.weight>5]") AS a FROM test`,
  554. data: &xsql.Tuple{
  555. Emitter: "test",
  556. Message: xsql.Message{
  557. "class": "warrior",
  558. "equipment": map[string]interface{}{
  559. "rings": []interface{}{
  560. map[string]interface{}{
  561. "name": "ring of despair",
  562. "weight": 0.1,
  563. }, map[string]interface{}{
  564. "name": "ring of strength",
  565. "weight": 2.4,
  566. },
  567. },
  568. "arm_right": "Sword of flame",
  569. "arm_left": "Shield of faith",
  570. },
  571. },
  572. },
  573. result: []map[string]interface{}{{
  574. "a": false,
  575. }},
  576. }, {
  577. sql: `SELECT json_path_exists(equipment, "$.ring1") AS a FROM test`,
  578. data: &xsql.Tuple{
  579. Emitter: "test",
  580. Message: xsql.Message{
  581. "class": "warrior",
  582. "equipment": map[string]interface{}{
  583. "rings": []interface{}{
  584. map[string]interface{}{
  585. "name": "ring of despair",
  586. "weight": 0.1,
  587. }, map[string]interface{}{
  588. "name": "ring of strength",
  589. "weight": 2.4,
  590. },
  591. },
  592. "arm_right": "Sword of flame",
  593. "arm_left": "Shield of faith",
  594. },
  595. },
  596. },
  597. result: []map[string]interface{}{{
  598. "a": false,
  599. }},
  600. }, {
  601. sql: `SELECT json_path_exists(equipment, "$.rings") AS a FROM test`,
  602. data: &xsql.Tuple{
  603. Emitter: "test",
  604. Message: xsql.Message{
  605. "class": "warrior",
  606. "equipment": map[string]interface{}{
  607. "rings": []interface{}{
  608. map[string]interface{}{
  609. "name": "ring of despair",
  610. "weight": 0.1,
  611. }, map[string]interface{}{
  612. "name": "ring of strength",
  613. "weight": 2.4,
  614. },
  615. },
  616. "arm_right": "Sword of flame",
  617. "arm_left": "Shield of faith",
  618. },
  619. },
  620. },
  621. result: []map[string]interface{}{{
  622. "a": true,
  623. }},
  624. }, {
  625. sql: `SELECT json_path_query(equipment, "$.rings[? (@.weight>1)].name") AS a FROM test`,
  626. data: &xsql.Tuple{
  627. Emitter: "test",
  628. Message: xsql.Message{
  629. "class": "warrior",
  630. "equipment": map[string]interface{}{
  631. "rings": []map[string]interface{}{
  632. {
  633. "name": "ring of despair",
  634. "weight": 0.1,
  635. }, {
  636. "name": "ring of strength",
  637. "weight": 2.4,
  638. },
  639. },
  640. "arm_right": "Sword of flame",
  641. "arm_left": "Shield of faith",
  642. },
  643. },
  644. },
  645. result: []map[string]interface{}{{
  646. "a": []interface{}{
  647. "ring of strength",
  648. },
  649. }},
  650. }, {
  651. sql: `SELECT json_path_query(equipment, "$.rings[*]") AS a FROM test`,
  652. data: &xsql.Tuple{
  653. Emitter: "test",
  654. Message: xsql.Message{
  655. "class": "warrior",
  656. "equipment": map[string]interface{}{
  657. "rings": []float64{
  658. 0.1, 2.4,
  659. },
  660. "arm_right": "Sword of flame",
  661. "arm_left": "Shield of faith",
  662. },
  663. },
  664. },
  665. result: []map[string]interface{}{{
  666. "a": []interface{}{
  667. 0.1, 2.4,
  668. },
  669. }},
  670. }, {
  671. sql: `SELECT json_path_query(equipment, "$.rings") AS a FROM test`,
  672. data: &xsql.Tuple{
  673. Emitter: "test",
  674. Message: xsql.Message{
  675. "class": "warrior",
  676. "equipment": map[string]interface{}{
  677. "rings": []float64{
  678. 0.1, 2.4,
  679. },
  680. "arm_right": "Sword of flame",
  681. "arm_left": "Shield of faith",
  682. },
  683. },
  684. },
  685. result: []map[string]interface{}{{
  686. "a": []interface{}{
  687. 0.1, 2.4,
  688. },
  689. }},
  690. }, {
  691. sql: `SELECT json_path_query(equipment, "$[0].rings[1]") AS a FROM test`,
  692. data: &xsql.Tuple{
  693. Emitter: "test",
  694. Message: xsql.Message{
  695. "class": "warrior",
  696. "equipment": []map[string]interface{}{
  697. {
  698. "rings": []float64{
  699. 0.1, 2.4,
  700. },
  701. "arm_right": "Sword of flame",
  702. "arm_left": "Shield of faith",
  703. },
  704. },
  705. },
  706. },
  707. result: []map[string]interface{}{{
  708. "a": 2.4,
  709. }},
  710. }, {
  711. sql: "SELECT json_path_query(equipment, \"$[0][\\\"arm.left\\\"]\") AS a FROM test",
  712. data: &xsql.Tuple{
  713. Emitter: "test",
  714. Message: xsql.Message{
  715. "class": "warrior",
  716. "equipment": []map[string]interface{}{
  717. {
  718. "rings": []float64{
  719. 0.1, 2.4,
  720. },
  721. "arm.right": "Sword of flame",
  722. "arm.left": "Shield of faith",
  723. },
  724. },
  725. },
  726. },
  727. result: []map[string]interface{}{{
  728. "a": "Shield of faith",
  729. }},
  730. }, {
  731. sql: "SELECT json_path_query(equipment, \"$[\\\"arm.left\\\"]\") AS a FROM test",
  732. data: &xsql.Tuple{
  733. Emitter: "test",
  734. Message: xsql.Message{
  735. "class": "warrior",
  736. "equipment": `{"rings": [0.1, 2.4],"arm.right": "Sword of flame","arm.left": "Shield of faith"}`,
  737. },
  738. },
  739. result: []map[string]interface{}{{
  740. "a": "Shield of faith",
  741. }},
  742. }, {
  743. sql: "SELECT json_path_query(equipment, \"$[0][\\\"arm.left\\\"]\") AS a FROM test",
  744. data: &xsql.Tuple{
  745. Emitter: "test",
  746. Message: xsql.Message{
  747. "class": "warrior",
  748. "equipment": `[{"rings": [0.1, 2.4],"arm.right": "Sword of flame","arm.left": "Shield of faith"}]`,
  749. },
  750. },
  751. result: []map[string]interface{}{{
  752. "a": "Shield of faith",
  753. }},
  754. }, {
  755. sql: `SELECT all[poi[-1] + 1]->ts as powerOnTs FROM test`,
  756. data: &xsql.Tuple{
  757. Emitter: "test",
  758. Message: xsql.Message{
  759. "all": []map[string]interface{}{
  760. {"SystemPowerMode": 0, "VehicleSpeed": 0, "FLWdwPosition": 0, "FrontWiperSwitchStatus": float64(1), "ts": 0},
  761. {"SystemPowerMode": 0, "VehicleSpeed": 0, "FLWdwPosition": 0, "FrontWiperSwitchStatus": float64(4), "ts": 500},
  762. {"SystemPowerMode": 2, "VehicleSpeed": 0, "FLWdwPosition": 0, "FrontWiperSwitchStatus": 0, "ts": 1000},
  763. {"SystemPowerMode": 2, "VehicleSpeed": 10, "FLWdwPosition": 20, "FrontWiperSwitchStatus": 0, "ts": 60000},
  764. {"SystemPowerMode": 2, "VehicleSpeed": 10, "FLWdwPosition": 20, "FrontWiperSwitchStatus": 0, "ts": 89500},
  765. {"SystemPowerMode": 2, "VehicleSpeed": 20, "FLWdwPosition": 50, "FrontWiperSwitchStatus": 5, "ts": 90000},
  766. {"SystemPowerMode": 2, "VehicleSpeed": 40, "FLWdwPosition": 60, "FrontWiperSwitchStatus": 5, "ts": 121000},
  767. },
  768. "poi": []interface{}{0, 1},
  769. },
  770. },
  771. result: []map[string]interface{}{{
  772. "powerOnTs": 1000,
  773. }},
  774. }, {
  775. sql: `SELECT json_path_query(equipment, "$.arm_right") AS a FROM test`,
  776. data: &xsql.Tuple{
  777. Emitter: "test",
  778. Message: xsql.Message{
  779. "class": "warrior",
  780. "equipment2": map[string]interface{}{
  781. "rings": []map[string]interface{}{
  782. {
  783. "name": "ring of despair",
  784. "weight": 0.1,
  785. }, {
  786. "name": "ring of strength",
  787. "weight": 2.4,
  788. },
  789. },
  790. "arm_right": "Sword of flame",
  791. "arm_left": "Shield of faith",
  792. },
  793. },
  794. },
  795. err: "run Select error: call func json_path_query error: invalid data nil for jsonpath",
  796. },
  797. }
  798. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  799. contextLogger := conf.Log.WithField("rule", "TestJsonFunc_Apply1")
  800. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  801. for i, tt := range tests {
  802. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  803. if err != nil || stmt == nil {
  804. t.Errorf("parse sql %s error %v", tt.sql, err)
  805. }
  806. pp := &ProjectOp{}
  807. parseStmt(pp, stmt.Fields)
  808. fv, afv := xsql.NewFunctionValuersForOp(ctx)
  809. opResult := pp.Apply(ctx, tt.data, fv, afv)
  810. if rt, ok := opResult.(error); ok {
  811. if tt.err == "" {
  812. t.Errorf("%d: got error:\n exp=%s\n got=%s\n\n", i, tt.result, rt)
  813. } else if !reflect.DeepEqual(tt.err, testx.Errstring(rt)) {
  814. t.Errorf("%d: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.err, rt)
  815. }
  816. } else {
  817. result, _ := parseResult(opResult, pp.IsAggregate)
  818. if tt.err == "" {
  819. if !reflect.DeepEqual(tt.result, result) {
  820. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  821. }
  822. } else {
  823. t.Errorf("%d: invalid result:\n exp error %s\n got=%s\n\n", i, tt.err, result)
  824. }
  825. }
  826. }
  827. }
  828. func TestChangedFuncs_Apply1(t *testing.T) {
  829. var tests = []struct {
  830. sql string
  831. data []interface{}
  832. result [][]map[string]interface{}
  833. }{
  834. {
  835. sql: `SELECT changed_col(true, a), b FROM test`,
  836. data: []interface{}{
  837. &xsql.Tuple{
  838. Emitter: "test",
  839. Message: xsql.Message{
  840. "a": "a1",
  841. "b": "b1",
  842. "c": "c1",
  843. },
  844. },
  845. &xsql.Tuple{
  846. Emitter: "test",
  847. Message: xsql.Message{
  848. "a": "a1",
  849. "b": "b2",
  850. "c": "c1",
  851. },
  852. },
  853. &xsql.Tuple{
  854. Emitter: "test",
  855. Message: xsql.Message{
  856. "a": "a1",
  857. "c": "c1",
  858. },
  859. },
  860. &xsql.Tuple{
  861. Emitter: "test",
  862. Message: xsql.Message{
  863. "a": "a1",
  864. "b": "b2",
  865. "c": "c2",
  866. },
  867. },
  868. },
  869. result: [][]map[string]interface{}{{{
  870. "changed_col": "a1",
  871. "b": "b1",
  872. }}, {{
  873. "b": "b2",
  874. }}, {{}}, {{
  875. "b": "b2",
  876. }}},
  877. }, {
  878. sql: `SELECT changed_col(true, *) FROM test`,
  879. data: []interface{}{
  880. &xsql.Tuple{
  881. Emitter: "test",
  882. Message: xsql.Message{
  883. "a": "a1",
  884. "b": "b1",
  885. },
  886. },
  887. &xsql.Tuple{
  888. Emitter: "test",
  889. Message: xsql.Message{
  890. "a": "a1",
  891. "c": "c1",
  892. },
  893. },
  894. &xsql.Tuple{
  895. Emitter: "test",
  896. Message: xsql.Message{
  897. "a": "a1",
  898. "c": "c1",
  899. },
  900. },
  901. &xsql.Tuple{
  902. Emitter: "test",
  903. Message: xsql.Message{
  904. "a": "a1",
  905. "b": "b2",
  906. "c": "c2",
  907. },
  908. },
  909. },
  910. result: [][]map[string]interface{}{{{
  911. "changed_col": xsql.Message{
  912. "a": "a1",
  913. "b": "b1",
  914. },
  915. }}, {{
  916. "changed_col": xsql.Message{
  917. "a": "a1",
  918. "c": "c1",
  919. },
  920. }}, {{}}, {{
  921. "changed_col": xsql.Message{
  922. "a": "a1",
  923. "b": "b2",
  924. "c": "c2",
  925. },
  926. }}},
  927. },
  928. }
  929. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  930. contextLogger := conf.Log.WithField("rule", "TestChangedFuncs_Apply1")
  931. for i, tt := range tests {
  932. tempStore, _ := state.CreateStore("mockRule"+strconv.Itoa(i), api.AtMostOnce)
  933. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta("mockRule"+strconv.Itoa(i), "project", tempStore)
  934. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  935. if err != nil || stmt == nil {
  936. t.Errorf("parse sql %s error %v", tt.sql, err)
  937. }
  938. pp := &ProjectOp{}
  939. parseStmt(pp, stmt.Fields)
  940. fv, afv := xsql.NewFunctionValuersForOp(ctx)
  941. r := make([][]map[string]interface{}, 0, len(tt.data))
  942. for _, d := range tt.data {
  943. opResult := pp.Apply(ctx, d, fv, afv)
  944. result, err := parseResult(opResult, pp.IsAggregate)
  945. if err != nil {
  946. t.Errorf("parse result error: %s", err)
  947. continue
  948. }
  949. r = append(r, result)
  950. }
  951. if !reflect.DeepEqual(tt.result, r) {
  952. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, r)
  953. }
  954. }
  955. }