misc_func_test.go 28 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "fmt"
  17. "reflect"
  18. "strconv"
  19. "strings"
  20. "testing"
  21. "github.com/lf-edge/ekuiper/internal/conf"
  22. "github.com/lf-edge/ekuiper/internal/testx"
  23. "github.com/lf-edge/ekuiper/internal/topo/context"
  24. "github.com/lf-edge/ekuiper/internal/topo/state"
  25. "github.com/lf-edge/ekuiper/internal/xsql"
  26. "github.com/lf-edge/ekuiper/pkg/api"
  27. "github.com/lf-edge/ekuiper/pkg/cast"
  28. )
  29. func TestMiscFunc_Apply1(t *testing.T) {
  30. tests := []struct {
  31. sql string
  32. data *xsql.Tuple
  33. result []map[string]interface{}
  34. }{
  35. {
  36. sql: "SELECT md5(a) AS a FROM test",
  37. data: &xsql.Tuple{
  38. Emitter: "test",
  39. Message: xsql.Message{
  40. "a": "The quick brown fox jumps over the lazy dog",
  41. "b": "myb",
  42. "c": "myc",
  43. },
  44. },
  45. result: []map[string]interface{}{{
  46. "a": strings.ToLower("9E107D9D372BB6826BD81D3542A419D6"),
  47. }},
  48. },
  49. {
  50. sql: "SELECT md5(d) AS a FROM test",
  51. data: &xsql.Tuple{
  52. Emitter: "test",
  53. Message: xsql.Message{
  54. "a": "The quick brown fox jumps over the lazy dog",
  55. "b": "myb",
  56. "c": "myc",
  57. },
  58. },
  59. result: []map[string]interface{}{{}},
  60. },
  61. {
  62. sql: "SELECT sha1(a) AS a FROM test",
  63. data: &xsql.Tuple{
  64. Emitter: "test",
  65. Message: xsql.Message{
  66. "a": "The quick brown fox jumps over the lazy dog",
  67. "b": "myb",
  68. "c": "myc",
  69. },
  70. },
  71. result: []map[string]interface{}{{
  72. "a": strings.ToLower("2FD4E1C67A2D28FCED849EE1BB76E7391B93EB12"),
  73. }},
  74. },
  75. {
  76. sql: "SELECT sha256(a) AS a FROM test",
  77. data: &xsql.Tuple{
  78. Emitter: "test",
  79. Message: xsql.Message{
  80. "a": "The quick brown fox jumps over the lazy dog",
  81. "b": "myb",
  82. "c": "myc",
  83. },
  84. },
  85. result: []map[string]interface{}{{
  86. "a": strings.ToLower("D7A8FBB307D7809469CA9ABCB0082E4F8D5651E46D3CDB762D02D0BF37C9E592"),
  87. }},
  88. },
  89. {
  90. sql: "SELECT sha384(a) AS a FROM test",
  91. data: &xsql.Tuple{
  92. Emitter: "test",
  93. Message: xsql.Message{
  94. "a": "The quick brown fox jumps over the lazy dog",
  95. "b": "myb",
  96. "c": "myc",
  97. },
  98. },
  99. result: []map[string]interface{}{{
  100. "a": strings.ToLower("CA737F1014A48F4C0B6DD43CB177B0AFD9E5169367544C494011E3317DBF9A509CB1E5DC1E85A941BBEE3D7F2AFBC9B1"),
  101. }},
  102. },
  103. {
  104. sql: "SELECT sha512(a) AS a FROM test",
  105. data: &xsql.Tuple{
  106. Emitter: "test",
  107. Message: xsql.Message{
  108. "a": "The quick brown fox jumps over the lazy dog",
  109. "b": "myb",
  110. "c": "myc",
  111. },
  112. },
  113. result: []map[string]interface{}{{
  114. "a": strings.ToLower("07E547D9586F6A73F73FBAC0435ED76951218FB7D0C8D788A309D785436BBB642E93A252A954F23912547D1E8A3B5ED6E1BFD7097821233FA0538F3DB854FEE6"),
  115. }},
  116. },
  117. {
  118. sql: "SELECT mqtt(topic) AS a FROM test",
  119. data: &xsql.Tuple{
  120. Emitter: "test",
  121. Message: xsql.Message{},
  122. Metadata: xsql.Metadata{
  123. "topic": "devices/device_001/message",
  124. },
  125. },
  126. result: []map[string]interface{}{{
  127. "a": "devices/device_001/message",
  128. }},
  129. },
  130. {
  131. sql: "SELECT mqtt(topic) AS a FROM test",
  132. data: &xsql.Tuple{
  133. Emitter: "test",
  134. Message: xsql.Message{},
  135. Metadata: xsql.Metadata{
  136. "topic": "devices/device_001/message",
  137. },
  138. },
  139. result: []map[string]interface{}{{
  140. "a": "devices/device_001/message",
  141. }},
  142. },
  143. {
  144. sql: "SELECT topic, mqtt(topic) AS a FROM test",
  145. data: &xsql.Tuple{
  146. Emitter: "test",
  147. Message: xsql.Message{
  148. "topic": "fff",
  149. },
  150. Metadata: xsql.Metadata{
  151. "topic": "devices/device_001/message",
  152. },
  153. },
  154. result: []map[string]interface{}{{
  155. "topic": "fff",
  156. "a": "devices/device_001/message",
  157. }},
  158. },
  159. {
  160. sql: "SELECT cardinality(arr) as r FROM test",
  161. data: &xsql.Tuple{
  162. Emitter: "test",
  163. Message: xsql.Message{
  164. "temperature": 43.2,
  165. "arr": []int{},
  166. },
  167. },
  168. result: []map[string]interface{}{{
  169. "r": 0,
  170. }},
  171. },
  172. {
  173. sql: "SELECT cardinality(arr) as r FROM test",
  174. data: &xsql.Tuple{
  175. Emitter: "test",
  176. Message: xsql.Message{
  177. "temperature": 43.2,
  178. "arr": []int{1, 2, 3, 4, 5},
  179. },
  180. },
  181. result: []map[string]interface{}{{
  182. "r": 5,
  183. }},
  184. },
  185. {
  186. sql: "SELECT isNull(arr) as r FROM test",
  187. data: &xsql.Tuple{
  188. Emitter: "test",
  189. Message: xsql.Message{
  190. "temperature": 43.2,
  191. "arr": []int{},
  192. },
  193. },
  194. result: []map[string]interface{}{{
  195. "r": false,
  196. }},
  197. },
  198. {
  199. sql: "SELECT isNull(arr) as r FROM test",
  200. data: &xsql.Tuple{
  201. Emitter: "test",
  202. Message: xsql.Message{
  203. "temperature": 43.2,
  204. "arr": []float64(nil),
  205. },
  206. },
  207. result: []map[string]interface{}{{
  208. "r": true,
  209. }},
  210. },
  211. {
  212. sql: "SELECT isNull(rec) as r FROM test",
  213. data: &xsql.Tuple{
  214. Emitter: "test",
  215. Message: xsql.Message{
  216. "temperature": 43.2,
  217. "rec": map[string]interface{}(nil),
  218. },
  219. },
  220. result: []map[string]interface{}{{
  221. "r": true,
  222. }},
  223. },
  224. {
  225. sql: "SELECT cast(a * 1000, \"datetime\") AS a FROM test",
  226. data: &xsql.Tuple{
  227. Emitter: "test",
  228. Message: xsql.Message{
  229. "a": 1.62000273e+09,
  230. "b": "ya",
  231. "c": "myc",
  232. },
  233. },
  234. result: []map[string]interface{}{{
  235. "a": cast.TimeFromUnixMilli(1.62000273e+12),
  236. }},
  237. },
  238. {
  239. sql: "SELECT rule_id() AS rule_id, rule_start() as rule_start FROM test",
  240. data: &xsql.Tuple{
  241. Emitter: "test",
  242. Message: xsql.Message{},
  243. },
  244. result: []map[string]interface{}{{
  245. "rule_id": "rule0",
  246. "rule_start": 12345,
  247. }},
  248. },
  249. }
  250. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  251. contextLogger := conf.Log.WithField("rule", "TestMiscFunc_Apply1")
  252. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  253. ctx = context.WithValue(ctx, context.RuleStartKey, 12345)
  254. ctx = ctx.WithMeta("rule0", "op1", &state.MemoryStore{}).(*context.DefaultContext)
  255. for i, tt := range tests {
  256. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  257. if err != nil || stmt == nil {
  258. t.Errorf("parse sql %s error %v", tt.sql, err)
  259. }
  260. pp := &ProjectOp{}
  261. parseStmt(pp, stmt.Fields)
  262. fv, afv := xsql.NewFunctionValuersForOp(ctx)
  263. opResult := pp.Apply(ctx, tt.data, fv, afv)
  264. result, err := parseResult(opResult, pp.IsAggregate)
  265. if err != nil {
  266. t.Errorf("parse result error: %s", err)
  267. continue
  268. }
  269. if !reflect.DeepEqual(tt.result, result) {
  270. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  271. }
  272. }
  273. }
  274. func TestMqttFunc_Apply2(t *testing.T) {
  275. tests := []struct {
  276. sql string
  277. data *xsql.JoinTuples
  278. result []map[string]interface{}
  279. }{
  280. {
  281. sql: "SELECT id1, mqtt(src1.topic) AS a, mqtt(src2.topic) as b FROM src1 LEFT JOIN src2 ON src1.id1 = src2.id1",
  282. data: &xsql.JoinTuples{
  283. Content: []*xsql.JoinTuple{
  284. {
  285. Tuples: []xsql.TupleRow{
  286. &xsql.Tuple{Emitter: "src1", Message: xsql.Message{"id1": "1", "f1": "v1"}, Metadata: xsql.Metadata{"topic": "devices/type1/device001"}},
  287. &xsql.Tuple{Emitter: "src2", Message: xsql.Message{"id2": "1", "f2": "w1"}, Metadata: xsql.Metadata{"topic": "devices/type2/device001"}},
  288. },
  289. },
  290. },
  291. },
  292. result: []map[string]interface{}{{
  293. "id1": "1",
  294. "a": "devices/type1/device001",
  295. "b": "devices/type2/device001",
  296. }},
  297. },
  298. }
  299. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  300. contextLogger := conf.Log.WithField("rule", "TestMqttFunc_Apply2")
  301. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  302. for i, tt := range tests {
  303. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  304. if err != nil || stmt == nil {
  305. t.Errorf("parse sql %s error %v", tt.sql, err)
  306. }
  307. pp := &ProjectOp{}
  308. parseStmt(pp, stmt.Fields)
  309. fv, afv := xsql.NewFunctionValuersForOp(nil)
  310. opResult := pp.Apply(ctx, tt.data, fv, afv)
  311. result, err := parseResult(opResult, pp.IsAggregate)
  312. if err != nil {
  313. t.Errorf("parse result error: %s", err)
  314. continue
  315. }
  316. if !reflect.DeepEqual(tt.result, result) {
  317. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  318. }
  319. }
  320. }
  321. func TestMetaFunc_Apply1(t *testing.T) {
  322. tests := []struct {
  323. sql string
  324. data interface{}
  325. result interface{}
  326. }{
  327. {
  328. sql: "SELECT topic, meta(topic) AS a FROM test",
  329. data: &xsql.Tuple{
  330. Emitter: "test",
  331. Message: xsql.Message{
  332. "topic": "fff",
  333. },
  334. Metadata: xsql.Metadata{
  335. "topic": "devices/device_001/message",
  336. },
  337. },
  338. result: []map[string]interface{}{{
  339. "topic": "fff",
  340. "a": "devices/device_001/message",
  341. }},
  342. },
  343. {
  344. sql: "SELECT meta(device) as d, meta(temperature->device) as r FROM test",
  345. data: &xsql.Tuple{
  346. Emitter: "test",
  347. Message: xsql.Message{
  348. "temperature": 43.2,
  349. },
  350. Metadata: xsql.Metadata{
  351. "temperature": map[string]interface{}{
  352. "id": "dfadfasfas",
  353. "device": "device2",
  354. },
  355. "device": "gateway",
  356. },
  357. },
  358. result: []map[string]interface{}{{
  359. "d": "gateway",
  360. "r": "device2",
  361. }},
  362. },
  363. {
  364. sql: "SELECT meta(*) as r FROM test",
  365. data: &xsql.Tuple{
  366. Emitter: "test",
  367. Message: xsql.Message{
  368. "temperature": 43.2,
  369. },
  370. Metadata: xsql.Metadata{
  371. "temperature": map[string]interface{}{
  372. "id": "dfadfasfas",
  373. "device": "device2",
  374. },
  375. "device": "gateway",
  376. },
  377. },
  378. result: []map[string]interface{}{{
  379. "r": map[string]interface{}{
  380. "temperature": map[string]interface{}{
  381. "id": "dfadfasfas",
  382. "device": "device2",
  383. },
  384. "device": "gateway",
  385. },
  386. }},
  387. },
  388. {
  389. sql: "SELECT topic, meta(`Light-diming`->device) AS a FROM test",
  390. data: &xsql.Tuple{
  391. Emitter: "test",
  392. Message: xsql.Message{
  393. "topic": "fff",
  394. },
  395. Metadata: xsql.Metadata{
  396. "Light-diming": map[string]interface{}{
  397. "device": "device2",
  398. },
  399. },
  400. },
  401. result: []map[string]interface{}{{
  402. "topic": "fff",
  403. "a": "device2",
  404. }},
  405. },
  406. }
  407. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  408. contextLogger := conf.Log.WithField("rule", "TestMetaFunc_Apply1")
  409. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  410. for i, tt := range tests {
  411. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  412. if err != nil || stmt == nil {
  413. t.Errorf("parse sql %s error %v", tt.sql, err)
  414. }
  415. pp := &ProjectOp{}
  416. parseStmt(pp, stmt.Fields)
  417. fv, afv := xsql.NewFunctionValuersForOp(nil)
  418. opResult := pp.Apply(ctx, tt.data, fv, afv)
  419. result, err := parseResult(opResult, pp.IsAggregate)
  420. if err != nil {
  421. t.Errorf("parse result error: %s", err)
  422. continue
  423. }
  424. if !reflect.DeepEqual(tt.result, result) {
  425. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  426. }
  427. }
  428. }
  429. func TestJsonPathFunc_Apply1(t *testing.T) {
  430. tests := []struct {
  431. sql string
  432. data interface{}
  433. result interface{}
  434. err string
  435. }{
  436. {
  437. sql: `SELECT json_path_query(equipment, "$.arm_right") AS a FROM test`,
  438. data: &xsql.Tuple{
  439. Emitter: "test",
  440. Message: xsql.Message{
  441. "class": "warrior",
  442. "equipment": map[string]interface{}{
  443. "rings": []map[string]interface{}{
  444. {
  445. "name": "ring of despair",
  446. "weight": 0.1,
  447. }, {
  448. "name": "ring of strength",
  449. "weight": 2.4,
  450. },
  451. },
  452. "arm_right": "Sword of flame",
  453. "arm_left": "Shield of faith",
  454. },
  455. },
  456. },
  457. result: []map[string]interface{}{{
  458. "a": "Sword of flame",
  459. }},
  460. }, {
  461. sql: `SELECT json_path_query(equipment, "$.rings[*].weight") AS a FROM test`,
  462. data: &xsql.Tuple{
  463. Emitter: "test",
  464. Message: xsql.Message{
  465. "class": "warrior",
  466. "equipment": map[string]interface{}{
  467. "rings": []interface{}{
  468. map[string]interface{}{
  469. "name": "ring of despair",
  470. "weight": 0.1,
  471. }, map[string]interface{}{
  472. "name": "ring of strength",
  473. "weight": 2.4,
  474. },
  475. },
  476. "arm_right": "Sword of flame",
  477. "arm_left": "Shield of faith",
  478. },
  479. },
  480. },
  481. result: []map[string]interface{}{{
  482. "a": []interface{}{
  483. 0.1, 2.4,
  484. },
  485. }},
  486. }, {
  487. sql: `SELECT json_path_query_first(equipment, "$.rings[*].weight") AS a FROM test`,
  488. data: &xsql.Tuple{
  489. Emitter: "test",
  490. Message: xsql.Message{
  491. "class": "warrior",
  492. "equipment": map[string]interface{}{
  493. "rings": []interface{}{
  494. map[string]interface{}{
  495. "name": "ring of despair",
  496. "weight": 0.1,
  497. }, map[string]interface{}{
  498. "name": "ring of strength",
  499. "weight": 2.4,
  500. },
  501. },
  502. "arm_right": "Sword of flame",
  503. "arm_left": "Shield of faith",
  504. },
  505. },
  506. },
  507. result: []map[string]interface{}{{
  508. "a": 0.1,
  509. }},
  510. }, {
  511. sql: `SELECT json_path_query(equipment, "$.rings[? @.weight>1]") AS a FROM test`,
  512. data: &xsql.Tuple{
  513. Emitter: "test",
  514. Message: xsql.Message{
  515. "class": "warrior",
  516. "equipment": map[string]interface{}{
  517. "rings": []interface{}{
  518. map[string]interface{}{
  519. "name": "ring of despair",
  520. "weight": 0.1,
  521. }, map[string]interface{}{
  522. "name": "ring of strength",
  523. "weight": 2.4,
  524. },
  525. },
  526. "arm_right": "Sword of flame",
  527. "arm_left": "Shield of faith",
  528. },
  529. },
  530. },
  531. result: []map[string]interface{}{{
  532. "a": []interface{}{
  533. map[string]interface{}{
  534. "name": "ring of strength",
  535. "weight": 2.4,
  536. },
  537. },
  538. }},
  539. }, {
  540. sql: `SELECT json_path_query(equipment, "$.rings[? @.weight>1].name") AS a FROM test`,
  541. data: &xsql.Tuple{
  542. Emitter: "test",
  543. Message: xsql.Message{
  544. "class": "warrior",
  545. "equipment": map[string]interface{}{
  546. "rings": []interface{}{
  547. map[string]interface{}{
  548. "name": "ring of despair",
  549. "weight": 0.1,
  550. }, map[string]interface{}{
  551. "name": "ring of strength",
  552. "weight": 2.4,
  553. },
  554. },
  555. "arm_right": "Sword of flame",
  556. "arm_left": "Shield of faith",
  557. },
  558. },
  559. },
  560. result: []map[string]interface{}{{
  561. "a": []interface{}{
  562. "ring of strength",
  563. },
  564. }},
  565. }, {
  566. sql: `SELECT json_path_exists(equipment, "$.rings[? @.weight>5]") AS a FROM test`,
  567. data: &xsql.Tuple{
  568. Emitter: "test",
  569. Message: xsql.Message{
  570. "class": "warrior",
  571. "equipment": map[string]interface{}{
  572. "rings": []interface{}{
  573. map[string]interface{}{
  574. "name": "ring of despair",
  575. "weight": 0.1,
  576. }, map[string]interface{}{
  577. "name": "ring of strength",
  578. "weight": 2.4,
  579. },
  580. },
  581. "arm_right": "Sword of flame",
  582. "arm_left": "Shield of faith",
  583. },
  584. },
  585. },
  586. result: []map[string]interface{}{{
  587. "a": false,
  588. }},
  589. }, {
  590. sql: `SELECT json_path_exists(equipment, "$.ring1") AS a FROM test`,
  591. data: &xsql.Tuple{
  592. Emitter: "test",
  593. Message: xsql.Message{
  594. "class": "warrior",
  595. "equipment": map[string]interface{}{
  596. "rings": []interface{}{
  597. map[string]interface{}{
  598. "name": "ring of despair",
  599. "weight": 0.1,
  600. }, map[string]interface{}{
  601. "name": "ring of strength",
  602. "weight": 2.4,
  603. },
  604. },
  605. "arm_right": "Sword of flame",
  606. "arm_left": "Shield of faith",
  607. },
  608. },
  609. },
  610. result: []map[string]interface{}{{
  611. "a": false,
  612. }},
  613. }, {
  614. sql: `SELECT json_path_exists(equipment, "$.rings") AS a FROM test`,
  615. data: &xsql.Tuple{
  616. Emitter: "test",
  617. Message: xsql.Message{
  618. "class": "warrior",
  619. "equipment": map[string]interface{}{
  620. "rings": []interface{}{
  621. map[string]interface{}{
  622. "name": "ring of despair",
  623. "weight": 0.1,
  624. }, map[string]interface{}{
  625. "name": "ring of strength",
  626. "weight": 2.4,
  627. },
  628. },
  629. "arm_right": "Sword of flame",
  630. "arm_left": "Shield of faith",
  631. },
  632. },
  633. },
  634. result: []map[string]interface{}{{
  635. "a": true,
  636. }},
  637. }, {
  638. sql: `SELECT json_path_query(equipment, "$.rings[? (@.weight>1)].name") AS a FROM test`,
  639. data: &xsql.Tuple{
  640. Emitter: "test",
  641. Message: xsql.Message{
  642. "class": "warrior",
  643. "equipment": map[string]interface{}{
  644. "rings": []map[string]interface{}{
  645. {
  646. "name": "ring of despair",
  647. "weight": 0.1,
  648. }, {
  649. "name": "ring of strength",
  650. "weight": 2.4,
  651. },
  652. },
  653. "arm_right": "Sword of flame",
  654. "arm_left": "Shield of faith",
  655. },
  656. },
  657. },
  658. result: []map[string]interface{}{{
  659. "a": []interface{}{
  660. "ring of strength",
  661. },
  662. }},
  663. }, {
  664. sql: `SELECT json_path_query(equipment, "$.rings[*]") AS a FROM test`,
  665. data: &xsql.Tuple{
  666. Emitter: "test",
  667. Message: xsql.Message{
  668. "class": "warrior",
  669. "equipment": map[string]interface{}{
  670. "rings": []float64{
  671. 0.1, 2.4,
  672. },
  673. "arm_right": "Sword of flame",
  674. "arm_left": "Shield of faith",
  675. },
  676. },
  677. },
  678. result: []map[string]interface{}{{
  679. "a": []interface{}{
  680. 0.1, 2.4,
  681. },
  682. }},
  683. }, {
  684. sql: `SELECT json_path_query(equipment, "$.rings") AS a FROM test`,
  685. data: &xsql.Tuple{
  686. Emitter: "test",
  687. Message: xsql.Message{
  688. "class": "warrior",
  689. "equipment": map[string]interface{}{
  690. "rings": []float64{
  691. 0.1, 2.4,
  692. },
  693. "arm_right": "Sword of flame",
  694. "arm_left": "Shield of faith",
  695. },
  696. },
  697. },
  698. result: []map[string]interface{}{{
  699. "a": []interface{}{
  700. 0.1, 2.4,
  701. },
  702. }},
  703. }, {
  704. sql: `SELECT json_path_query(equipment, "$[0].rings[1]") AS a FROM test`,
  705. data: &xsql.Tuple{
  706. Emitter: "test",
  707. Message: xsql.Message{
  708. "class": "warrior",
  709. "equipment": []map[string]interface{}{
  710. {
  711. "rings": []float64{
  712. 0.1, 2.4,
  713. },
  714. "arm_right": "Sword of flame",
  715. "arm_left": "Shield of faith",
  716. },
  717. },
  718. },
  719. },
  720. result: []map[string]interface{}{{
  721. "a": 2.4,
  722. }},
  723. }, {
  724. sql: "SELECT json_path_query(equipment, \"$[0][\\\"arm.left\\\"]\") AS a FROM test",
  725. data: &xsql.Tuple{
  726. Emitter: "test",
  727. Message: xsql.Message{
  728. "class": "warrior",
  729. "equipment": []map[string]interface{}{
  730. {
  731. "rings": []float64{
  732. 0.1, 2.4,
  733. },
  734. "arm.right": "Sword of flame",
  735. "arm.left": "Shield of faith",
  736. },
  737. },
  738. },
  739. },
  740. result: []map[string]interface{}{{
  741. "a": "Shield of faith",
  742. }},
  743. }, {
  744. sql: "SELECT json_path_query(equipment, \"$[\\\"arm.left\\\"]\") AS a FROM test",
  745. data: &xsql.Tuple{
  746. Emitter: "test",
  747. Message: xsql.Message{
  748. "class": "warrior",
  749. "equipment": `{"rings": [0.1, 2.4],"arm.right": "Sword of flame","arm.left": "Shield of faith"}`,
  750. },
  751. },
  752. result: []map[string]interface{}{{
  753. "a": "Shield of faith",
  754. }},
  755. }, {
  756. sql: "SELECT json_path_query(equipment, \"$[0][\\\"arm.left\\\"]\") AS a FROM test",
  757. data: &xsql.Tuple{
  758. Emitter: "test",
  759. Message: xsql.Message{
  760. "class": "warrior",
  761. "equipment": `[{"rings": [0.1, 2.4],"arm.right": "Sword of flame","arm.left": "Shield of faith"}]`,
  762. },
  763. },
  764. result: []map[string]interface{}{{
  765. "a": "Shield of faith",
  766. }},
  767. }, {
  768. sql: `SELECT all[poi[-1] + 1]->ts as powerOnTs FROM test`,
  769. data: &xsql.Tuple{
  770. Emitter: "test",
  771. Message: xsql.Message{
  772. "all": []map[string]interface{}{
  773. {"SystemPowerMode": 0, "VehicleSpeed": 0, "FLWdwPosition": 0, "FrontWiperSwitchStatus": float64(1), "ts": 0},
  774. {"SystemPowerMode": 0, "VehicleSpeed": 0, "FLWdwPosition": 0, "FrontWiperSwitchStatus": float64(4), "ts": 500},
  775. {"SystemPowerMode": 2, "VehicleSpeed": 0, "FLWdwPosition": 0, "FrontWiperSwitchStatus": 0, "ts": 1000},
  776. {"SystemPowerMode": 2, "VehicleSpeed": 10, "FLWdwPosition": 20, "FrontWiperSwitchStatus": 0, "ts": 60000},
  777. {"SystemPowerMode": 2, "VehicleSpeed": 10, "FLWdwPosition": 20, "FrontWiperSwitchStatus": 0, "ts": 89500},
  778. {"SystemPowerMode": 2, "VehicleSpeed": 20, "FLWdwPosition": 50, "FrontWiperSwitchStatus": 5, "ts": 90000},
  779. {"SystemPowerMode": 2, "VehicleSpeed": 40, "FLWdwPosition": 60, "FrontWiperSwitchStatus": 5, "ts": 121000},
  780. },
  781. "poi": []interface{}{0, 1},
  782. },
  783. },
  784. result: []map[string]interface{}{{
  785. "powerOnTs": 1000,
  786. }},
  787. }, {
  788. sql: `SELECT json_path_query(equipment, "$.arm_right") AS a FROM test`,
  789. data: &xsql.Tuple{
  790. Emitter: "test",
  791. Message: xsql.Message{
  792. "class": "warrior",
  793. "equipment2": map[string]interface{}{
  794. "rings": []map[string]interface{}{
  795. {
  796. "name": "ring of despair",
  797. "weight": 0.1,
  798. }, {
  799. "name": "ring of strength",
  800. "weight": 2.4,
  801. },
  802. },
  803. "arm_right": "Sword of flame",
  804. "arm_left": "Shield of faith",
  805. },
  806. },
  807. },
  808. err: "run Select error: call func json_path_query error: invalid data nil for jsonpath",
  809. },
  810. }
  811. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  812. contextLogger := conf.Log.WithField("rule", "TestJsonFunc_Apply1")
  813. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  814. for i, tt := range tests {
  815. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  816. if err != nil || stmt == nil {
  817. t.Errorf("parse sql %s error %v", tt.sql, err)
  818. }
  819. pp := &ProjectOp{}
  820. parseStmt(pp, stmt.Fields)
  821. fv, afv := xsql.NewFunctionValuersForOp(ctx)
  822. opResult := pp.Apply(ctx, tt.data, fv, afv)
  823. if rt, ok := opResult.(error); ok {
  824. if tt.err == "" {
  825. t.Errorf("%d: got error:\n exp=%s\n got=%s\n\n", i, tt.result, rt)
  826. } else if !reflect.DeepEqual(tt.err, testx.Errstring(rt)) {
  827. t.Errorf("%d: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.err, rt)
  828. }
  829. } else {
  830. result, _ := parseResult(opResult, pp.IsAggregate)
  831. if tt.err == "" {
  832. if !reflect.DeepEqual(tt.result, result) {
  833. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  834. }
  835. } else {
  836. t.Errorf("%d: invalid result:\n exp error %s\n got=%s\n\n", i, tt.err, result)
  837. }
  838. }
  839. }
  840. }
  841. func TestChangedFuncs_Apply1(t *testing.T) {
  842. tests := []struct {
  843. sql string
  844. data []interface{}
  845. result [][]map[string]interface{}
  846. }{
  847. {
  848. sql: `SELECT changed_col(true, a), b FROM test`,
  849. data: []interface{}{
  850. &xsql.Tuple{
  851. Emitter: "test",
  852. Message: xsql.Message{
  853. "a": "a1",
  854. "b": "b1",
  855. "c": "c1",
  856. },
  857. },
  858. &xsql.Tuple{
  859. Emitter: "test",
  860. Message: xsql.Message{
  861. "a": "a1",
  862. "b": "b2",
  863. "c": "c1",
  864. },
  865. },
  866. &xsql.Tuple{
  867. Emitter: "test",
  868. Message: xsql.Message{
  869. "a": "a1",
  870. "c": "c1",
  871. },
  872. },
  873. &xsql.Tuple{
  874. Emitter: "test",
  875. Message: xsql.Message{
  876. "a": "a1",
  877. "b": "b2",
  878. "c": "c2",
  879. },
  880. },
  881. },
  882. result: [][]map[string]interface{}{{{
  883. "changed_col": "a1",
  884. "b": "b1",
  885. }}, {{
  886. "b": "b2",
  887. }}, {{}}, {{
  888. "b": "b2",
  889. }}},
  890. }, {
  891. sql: `SELECT changed_col(true, *) FROM test`,
  892. data: []interface{}{
  893. &xsql.Tuple{
  894. Emitter: "test",
  895. Message: xsql.Message{
  896. "a": "a1",
  897. "b": "b1",
  898. },
  899. },
  900. &xsql.Tuple{
  901. Emitter: "test",
  902. Message: xsql.Message{
  903. "a": "a1",
  904. "c": "c1",
  905. },
  906. },
  907. &xsql.Tuple{
  908. Emitter: "test",
  909. Message: xsql.Message{
  910. "a": "a1",
  911. "c": "c1",
  912. },
  913. },
  914. &xsql.Tuple{
  915. Emitter: "test",
  916. Message: xsql.Message{
  917. "a": "a1",
  918. "b": "b2",
  919. "c": "c2",
  920. },
  921. },
  922. },
  923. result: [][]map[string]interface{}{{{
  924. "changed_col": map[string]interface{}{
  925. "a": "a1",
  926. "b": "b1",
  927. },
  928. }}, {{
  929. "changed_col": map[string]interface{}{
  930. "a": "a1",
  931. "c": "c1",
  932. },
  933. }}, {{}}, {{
  934. "changed_col": map[string]interface{}{
  935. "a": "a1",
  936. "b": "b2",
  937. "c": "c2",
  938. },
  939. }}},
  940. },
  941. }
  942. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  943. contextLogger := conf.Log.WithField("rule", "TestChangedFuncs_Apply1")
  944. for i, tt := range tests {
  945. tempStore, _ := state.CreateStore("mockRule"+strconv.Itoa(i), api.AtMostOnce)
  946. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta("mockRule"+strconv.Itoa(i), "project", tempStore)
  947. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  948. if err != nil || stmt == nil {
  949. t.Errorf("parse sql %s error %v", tt.sql, err)
  950. }
  951. pp := &ProjectOp{}
  952. parseStmt(pp, stmt.Fields)
  953. fv, afv := xsql.NewFunctionValuersForOp(ctx)
  954. r := make([][]map[string]interface{}, 0, len(tt.data))
  955. for _, d := range tt.data {
  956. opResult := pp.Apply(ctx, d, fv, afv)
  957. result, err := parseResult(opResult, pp.IsAggregate)
  958. if err != nil {
  959. t.Errorf("parse result error: %s", err)
  960. continue
  961. }
  962. r = append(r, result)
  963. }
  964. if !reflect.DeepEqual(tt.result, r) {
  965. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, r)
  966. }
  967. }
  968. }
  969. func TestLagFuncs_Apply1(t *testing.T) {
  970. tests := []struct {
  971. sql string
  972. data []interface{}
  973. result [][]map[string]interface{}
  974. }{
  975. {
  976. sql: `SELECT lag(a) as a, lag(b) as b FROM test`,
  977. data: []interface{}{
  978. &xsql.Tuple{
  979. Emitter: "test",
  980. Message: xsql.Message{
  981. "a": "a1",
  982. "b": "b1",
  983. "c": "c1",
  984. },
  985. },
  986. &xsql.Tuple{
  987. Emitter: "test",
  988. Message: xsql.Message{
  989. "a": "a1",
  990. "b": "b2",
  991. "c": "c1",
  992. },
  993. },
  994. &xsql.Tuple{
  995. Emitter: "test",
  996. Message: xsql.Message{
  997. "a": "a1",
  998. "c": "c1",
  999. },
  1000. },
  1001. &xsql.Tuple{
  1002. Emitter: "test",
  1003. Message: xsql.Message{
  1004. "a": "a1",
  1005. "b": "b2",
  1006. "c": "c2",
  1007. },
  1008. },
  1009. },
  1010. result: [][]map[string]interface{}{{{}}, {{
  1011. "a": "a1",
  1012. "b": "b1",
  1013. }}, {{
  1014. "a": "a1",
  1015. "b": "b2",
  1016. }}, {{
  1017. "a": "a1",
  1018. }}},
  1019. },
  1020. {
  1021. sql: `SELECT lag(a, 2, "a10") as a FROM test`,
  1022. data: []interface{}{
  1023. &xsql.Tuple{
  1024. Emitter: "test",
  1025. Message: xsql.Message{
  1026. "a": "a1",
  1027. "b": "b1",
  1028. },
  1029. },
  1030. &xsql.Tuple{
  1031. Emitter: "test",
  1032. Message: xsql.Message{
  1033. "a": "a2",
  1034. "c": "c1",
  1035. },
  1036. },
  1037. &xsql.Tuple{
  1038. Emitter: "test",
  1039. Message: xsql.Message{
  1040. "a": "a1",
  1041. "c": "c1",
  1042. },
  1043. },
  1044. &xsql.Tuple{
  1045. Emitter: "test",
  1046. Message: xsql.Message{
  1047. "a": "a1",
  1048. "b": "b2",
  1049. "c": "c2",
  1050. },
  1051. },
  1052. },
  1053. result: [][]map[string]interface{}{{{
  1054. "a": "a10",
  1055. }}, {{
  1056. "a": "a10",
  1057. }}, {{
  1058. "a": "a1",
  1059. }}, {{
  1060. "a": "a2",
  1061. }}},
  1062. },
  1063. {
  1064. sql: `SELECT lag(a, 2) as a FROM test`,
  1065. data: []interface{}{
  1066. &xsql.Tuple{
  1067. Emitter: "test",
  1068. Message: xsql.Message{
  1069. "a": "a1",
  1070. "b": "b1",
  1071. },
  1072. },
  1073. &xsql.Tuple{
  1074. Emitter: "test",
  1075. Message: xsql.Message{
  1076. "a": "a2",
  1077. "c": "c1",
  1078. },
  1079. },
  1080. &xsql.Tuple{
  1081. Emitter: "test",
  1082. Message: xsql.Message{
  1083. "a": "a1",
  1084. "c": "c1",
  1085. },
  1086. },
  1087. &xsql.Tuple{
  1088. Emitter: "test",
  1089. Message: xsql.Message{
  1090. "a": "a1",
  1091. "b": "b2",
  1092. "c": "c2",
  1093. },
  1094. },
  1095. },
  1096. result: [][]map[string]interface{}{{{}}, {{}}, {{
  1097. "a": "a1",
  1098. }}, {{
  1099. "a": "a2",
  1100. }}},
  1101. },
  1102. }
  1103. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1104. contextLogger := conf.Log.WithField("rule", "TestChangedFuncs_Apply1")
  1105. for i, tt := range tests {
  1106. tempStore, _ := state.CreateStore("mockRule"+strconv.Itoa(i), api.AtMostOnce)
  1107. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta("mockRule"+strconv.Itoa(i), "project", tempStore)
  1108. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1109. if err != nil || stmt == nil {
  1110. t.Errorf("parse sql %s error %v", tt.sql, err)
  1111. }
  1112. pp := &ProjectOp{}
  1113. parseStmt(pp, stmt.Fields)
  1114. fv, afv := xsql.NewFunctionValuersForOp(ctx)
  1115. r := make([][]map[string]interface{}, 0, len(tt.data))
  1116. for _, d := range tt.data {
  1117. opResult := pp.Apply(ctx, d, fv, afv)
  1118. result, err := parseResult(opResult, pp.IsAggregate)
  1119. if err != nil {
  1120. t.Errorf("parse result error: %s", err)
  1121. continue
  1122. }
  1123. r = append(r, result)
  1124. }
  1125. if !reflect.DeepEqual(tt.result, r) {
  1126. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, r)
  1127. }
  1128. }
  1129. }