misc_func_test.go 28 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/conf"
  18. "github.com/lf-edge/ekuiper/internal/testx"
  19. "github.com/lf-edge/ekuiper/internal/topo/context"
  20. "github.com/lf-edge/ekuiper/internal/topo/state"
  21. "github.com/lf-edge/ekuiper/internal/xsql"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "github.com/lf-edge/ekuiper/pkg/cast"
  24. "reflect"
  25. "strconv"
  26. "strings"
  27. "testing"
  28. )
  29. func TestMiscFunc_Apply1(t *testing.T) {
  30. var tests = []struct {
  31. sql string
  32. data *xsql.Tuple
  33. result []map[string]interface{}
  34. }{
  35. {
  36. sql: "SELECT md5(a) AS a FROM test",
  37. data: &xsql.Tuple{
  38. Emitter: "test",
  39. Message: xsql.Message{
  40. "a": "The quick brown fox jumps over the lazy dog",
  41. "b": "myb",
  42. "c": "myc",
  43. },
  44. },
  45. result: []map[string]interface{}{{
  46. "a": strings.ToLower("9E107D9D372BB6826BD81D3542A419D6"),
  47. }},
  48. },
  49. {
  50. sql: "SELECT md5(d) AS a FROM test",
  51. data: &xsql.Tuple{
  52. Emitter: "test",
  53. Message: xsql.Message{
  54. "a": "The quick brown fox jumps over the lazy dog",
  55. "b": "myb",
  56. "c": "myc",
  57. },
  58. },
  59. result: []map[string]interface{}{{}},
  60. },
  61. {
  62. sql: "SELECT sha1(a) AS a FROM test",
  63. data: &xsql.Tuple{
  64. Emitter: "test",
  65. Message: xsql.Message{
  66. "a": "The quick brown fox jumps over the lazy dog",
  67. "b": "myb",
  68. "c": "myc",
  69. },
  70. },
  71. result: []map[string]interface{}{{
  72. "a": strings.ToLower("2FD4E1C67A2D28FCED849EE1BB76E7391B93EB12"),
  73. }},
  74. },
  75. {
  76. sql: "SELECT sha256(a) AS a FROM test",
  77. data: &xsql.Tuple{
  78. Emitter: "test",
  79. Message: xsql.Message{
  80. "a": "The quick brown fox jumps over the lazy dog",
  81. "b": "myb",
  82. "c": "myc",
  83. },
  84. },
  85. result: []map[string]interface{}{{
  86. "a": strings.ToLower("D7A8FBB307D7809469CA9ABCB0082E4F8D5651E46D3CDB762D02D0BF37C9E592"),
  87. }},
  88. },
  89. {
  90. sql: "SELECT sha384(a) AS a FROM test",
  91. data: &xsql.Tuple{
  92. Emitter: "test",
  93. Message: xsql.Message{
  94. "a": "The quick brown fox jumps over the lazy dog",
  95. "b": "myb",
  96. "c": "myc",
  97. },
  98. },
  99. result: []map[string]interface{}{{
  100. "a": strings.ToLower("CA737F1014A48F4C0B6DD43CB177B0AFD9E5169367544C494011E3317DBF9A509CB1E5DC1E85A941BBEE3D7F2AFBC9B1"),
  101. }},
  102. },
  103. {
  104. sql: "SELECT sha512(a) AS a FROM test",
  105. data: &xsql.Tuple{
  106. Emitter: "test",
  107. Message: xsql.Message{
  108. "a": "The quick brown fox jumps over the lazy dog",
  109. "b": "myb",
  110. "c": "myc",
  111. },
  112. },
  113. result: []map[string]interface{}{{
  114. "a": strings.ToLower("07E547D9586F6A73F73FBAC0435ED76951218FB7D0C8D788A309D785436BBB642E93A252A954F23912547D1E8A3B5ED6E1BFD7097821233FA0538F3DB854FEE6"),
  115. }},
  116. },
  117. {
  118. sql: "SELECT mqtt(topic) AS a FROM test",
  119. data: &xsql.Tuple{
  120. Emitter: "test",
  121. Message: xsql.Message{},
  122. Metadata: xsql.Metadata{
  123. "topic": "devices/device_001/message",
  124. },
  125. },
  126. result: []map[string]interface{}{{
  127. "a": "devices/device_001/message",
  128. }},
  129. },
  130. {
  131. sql: "SELECT mqtt(topic) AS a FROM test",
  132. data: &xsql.Tuple{
  133. Emitter: "test",
  134. Message: xsql.Message{},
  135. Metadata: xsql.Metadata{
  136. "topic": "devices/device_001/message",
  137. },
  138. },
  139. result: []map[string]interface{}{{
  140. "a": "devices/device_001/message",
  141. }},
  142. },
  143. {
  144. sql: "SELECT topic, mqtt(topic) AS a FROM test",
  145. data: &xsql.Tuple{
  146. Emitter: "test",
  147. Message: xsql.Message{
  148. "topic": "fff",
  149. },
  150. Metadata: xsql.Metadata{
  151. "topic": "devices/device_001/message",
  152. },
  153. },
  154. result: []map[string]interface{}{{
  155. "topic": "fff",
  156. "a": "devices/device_001/message",
  157. }},
  158. },
  159. {
  160. sql: "SELECT cardinality(arr) as r FROM test",
  161. data: &xsql.Tuple{
  162. Emitter: "test",
  163. Message: xsql.Message{
  164. "temperature": 43.2,
  165. "arr": []int{},
  166. },
  167. },
  168. result: []map[string]interface{}{{
  169. "r": 0,
  170. }},
  171. },
  172. {
  173. sql: "SELECT cardinality(arr) as r FROM test",
  174. data: &xsql.Tuple{
  175. Emitter: "test",
  176. Message: xsql.Message{
  177. "temperature": 43.2,
  178. "arr": []int{1, 2, 3, 4, 5},
  179. },
  180. },
  181. result: []map[string]interface{}{{
  182. "r": 5,
  183. }},
  184. },
  185. {
  186. sql: "SELECT isNull(arr) as r FROM test",
  187. data: &xsql.Tuple{
  188. Emitter: "test",
  189. Message: xsql.Message{
  190. "temperature": 43.2,
  191. "arr": []int{},
  192. },
  193. },
  194. result: []map[string]interface{}{{
  195. "r": false,
  196. }},
  197. },
  198. {
  199. sql: "SELECT isNull(arr) as r FROM test",
  200. data: &xsql.Tuple{
  201. Emitter: "test",
  202. Message: xsql.Message{
  203. "temperature": 43.2,
  204. "arr": []float64(nil),
  205. },
  206. },
  207. result: []map[string]interface{}{{
  208. "r": true,
  209. }},
  210. },
  211. {
  212. sql: "SELECT isNull(rec) as r FROM test",
  213. data: &xsql.Tuple{
  214. Emitter: "test",
  215. Message: xsql.Message{
  216. "temperature": 43.2,
  217. "rec": map[string]interface{}(nil),
  218. },
  219. },
  220. result: []map[string]interface{}{{
  221. "r": true,
  222. }},
  223. },
  224. {
  225. sql: "SELECT cast(a * 1000, \"datetime\") AS a FROM test",
  226. data: &xsql.Tuple{
  227. Emitter: "test",
  228. Message: xsql.Message{
  229. "a": 1.62000273e+09,
  230. "b": "ya",
  231. "c": "myc",
  232. },
  233. },
  234. result: []map[string]interface{}{{
  235. "a": cast.TimeFromUnixMilli(1.62000273e+12),
  236. }},
  237. },
  238. }
  239. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  240. contextLogger := conf.Log.WithField("rule", "TestMiscFunc_Apply1")
  241. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  242. for i, tt := range tests {
  243. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  244. if err != nil || stmt == nil {
  245. t.Errorf("parse sql %s error %v", tt.sql, err)
  246. }
  247. pp := &ProjectOp{}
  248. parseStmt(pp, stmt.Fields)
  249. fv, afv := xsql.NewFunctionValuersForOp(nil)
  250. opResult := pp.Apply(ctx, tt.data, fv, afv)
  251. result, err := parseResult(opResult, pp.IsAggregate)
  252. if err != nil {
  253. t.Errorf("parse result error: %s", err)
  254. continue
  255. }
  256. if !reflect.DeepEqual(tt.result, result) {
  257. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  258. }
  259. }
  260. }
  261. func TestMqttFunc_Apply2(t *testing.T) {
  262. var tests = []struct {
  263. sql string
  264. data *xsql.JoinTuples
  265. result []map[string]interface{}
  266. }{
  267. {
  268. sql: "SELECT id1, mqtt(src1.topic) AS a, mqtt(src2.topic) as b FROM src1 LEFT JOIN src2 ON src1.id1 = src2.id1",
  269. data: &xsql.JoinTuples{
  270. Content: []*xsql.JoinTuple{
  271. {
  272. Tuples: []xsql.TupleRow{
  273. &xsql.Tuple{Emitter: "src1", Message: xsql.Message{"id1": "1", "f1": "v1"}, Metadata: xsql.Metadata{"topic": "devices/type1/device001"}},
  274. &xsql.Tuple{Emitter: "src2", Message: xsql.Message{"id2": "1", "f2": "w1"}, Metadata: xsql.Metadata{"topic": "devices/type2/device001"}},
  275. },
  276. },
  277. },
  278. },
  279. result: []map[string]interface{}{{
  280. "id1": "1",
  281. "a": "devices/type1/device001",
  282. "b": "devices/type2/device001",
  283. }},
  284. },
  285. }
  286. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  287. contextLogger := conf.Log.WithField("rule", "TestMqttFunc_Apply2")
  288. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  289. for i, tt := range tests {
  290. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  291. if err != nil || stmt == nil {
  292. t.Errorf("parse sql %s error %v", tt.sql, err)
  293. }
  294. pp := &ProjectOp{}
  295. parseStmt(pp, stmt.Fields)
  296. fv, afv := xsql.NewFunctionValuersForOp(nil)
  297. opResult := pp.Apply(ctx, tt.data, fv, afv)
  298. result, err := parseResult(opResult, pp.IsAggregate)
  299. if err != nil {
  300. t.Errorf("parse result error: %s", err)
  301. continue
  302. }
  303. if !reflect.DeepEqual(tt.result, result) {
  304. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  305. }
  306. }
  307. }
  308. func TestMetaFunc_Apply1(t *testing.T) {
  309. var tests = []struct {
  310. sql string
  311. data interface{}
  312. result interface{}
  313. }{
  314. {
  315. sql: "SELECT topic, meta(topic) AS a FROM test",
  316. data: &xsql.Tuple{
  317. Emitter: "test",
  318. Message: xsql.Message{
  319. "topic": "fff",
  320. },
  321. Metadata: xsql.Metadata{
  322. "topic": "devices/device_001/message",
  323. },
  324. },
  325. result: []map[string]interface{}{{
  326. "topic": "fff",
  327. "a": "devices/device_001/message",
  328. }},
  329. },
  330. {
  331. sql: "SELECT meta(device) as d, meta(temperature->device) as r FROM test",
  332. data: &xsql.Tuple{
  333. Emitter: "test",
  334. Message: xsql.Message{
  335. "temperature": 43.2,
  336. },
  337. Metadata: xsql.Metadata{
  338. "temperature": map[string]interface{}{
  339. "id": "dfadfasfas",
  340. "device": "device2",
  341. },
  342. "device": "gateway",
  343. },
  344. },
  345. result: []map[string]interface{}{{
  346. "d": "gateway",
  347. "r": "device2",
  348. }},
  349. },
  350. {
  351. sql: "SELECT meta(*) as r FROM test",
  352. data: &xsql.Tuple{
  353. Emitter: "test",
  354. Message: xsql.Message{
  355. "temperature": 43.2,
  356. },
  357. Metadata: xsql.Metadata{
  358. "temperature": map[string]interface{}{
  359. "id": "dfadfasfas",
  360. "device": "device2",
  361. },
  362. "device": "gateway",
  363. },
  364. },
  365. result: []map[string]interface{}{{
  366. "r": map[string]interface{}{
  367. "temperature": map[string]interface{}{
  368. "id": "dfadfasfas",
  369. "device": "device2",
  370. },
  371. "device": "gateway",
  372. },
  373. }},
  374. },
  375. {
  376. sql: "SELECT topic, meta(`Light-diming`->device) AS a FROM test",
  377. data: &xsql.Tuple{
  378. Emitter: "test",
  379. Message: xsql.Message{
  380. "topic": "fff",
  381. },
  382. Metadata: xsql.Metadata{
  383. "Light-diming": map[string]interface{}{
  384. "device": "device2",
  385. },
  386. },
  387. },
  388. result: []map[string]interface{}{{
  389. "topic": "fff",
  390. "a": "device2",
  391. }},
  392. },
  393. }
  394. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  395. contextLogger := conf.Log.WithField("rule", "TestMetaFunc_Apply1")
  396. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  397. for i, tt := range tests {
  398. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  399. if err != nil || stmt == nil {
  400. t.Errorf("parse sql %s error %v", tt.sql, err)
  401. }
  402. pp := &ProjectOp{}
  403. parseStmt(pp, stmt.Fields)
  404. fv, afv := xsql.NewFunctionValuersForOp(nil)
  405. opResult := pp.Apply(ctx, tt.data, fv, afv)
  406. result, err := parseResult(opResult, pp.IsAggregate)
  407. if err != nil {
  408. t.Errorf("parse result error: %s", err)
  409. continue
  410. }
  411. if !reflect.DeepEqual(tt.result, result) {
  412. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  413. }
  414. }
  415. }
  416. func TestJsonPathFunc_Apply1(t *testing.T) {
  417. var tests = []struct {
  418. sql string
  419. data interface{}
  420. result interface{}
  421. err string
  422. }{
  423. {
  424. sql: `SELECT json_path_query(equipment, "$.arm_right") AS a FROM test`,
  425. data: &xsql.Tuple{
  426. Emitter: "test",
  427. Message: xsql.Message{
  428. "class": "warrior",
  429. "equipment": map[string]interface{}{
  430. "rings": []map[string]interface{}{
  431. {
  432. "name": "ring of despair",
  433. "weight": 0.1,
  434. }, {
  435. "name": "ring of strength",
  436. "weight": 2.4,
  437. },
  438. },
  439. "arm_right": "Sword of flame",
  440. "arm_left": "Shield of faith",
  441. },
  442. },
  443. },
  444. result: []map[string]interface{}{{
  445. "a": "Sword of flame",
  446. }},
  447. }, {
  448. sql: `SELECT json_path_query(equipment, "$.rings[*].weight") AS a FROM test`,
  449. data: &xsql.Tuple{
  450. Emitter: "test",
  451. Message: xsql.Message{
  452. "class": "warrior",
  453. "equipment": map[string]interface{}{
  454. "rings": []interface{}{
  455. map[string]interface{}{
  456. "name": "ring of despair",
  457. "weight": 0.1,
  458. }, map[string]interface{}{
  459. "name": "ring of strength",
  460. "weight": 2.4,
  461. },
  462. },
  463. "arm_right": "Sword of flame",
  464. "arm_left": "Shield of faith",
  465. },
  466. },
  467. },
  468. result: []map[string]interface{}{{
  469. "a": []interface{}{
  470. 0.1, 2.4,
  471. },
  472. }},
  473. }, {
  474. sql: `SELECT json_path_query_first(equipment, "$.rings[*].weight") AS a FROM test`,
  475. data: &xsql.Tuple{
  476. Emitter: "test",
  477. Message: xsql.Message{
  478. "class": "warrior",
  479. "equipment": map[string]interface{}{
  480. "rings": []interface{}{
  481. map[string]interface{}{
  482. "name": "ring of despair",
  483. "weight": 0.1,
  484. }, map[string]interface{}{
  485. "name": "ring of strength",
  486. "weight": 2.4,
  487. },
  488. },
  489. "arm_right": "Sword of flame",
  490. "arm_left": "Shield of faith",
  491. },
  492. },
  493. },
  494. result: []map[string]interface{}{{
  495. "a": 0.1,
  496. }},
  497. }, {
  498. sql: `SELECT json_path_query(equipment, "$.rings[? @.weight>1]") AS a FROM test`,
  499. data: &xsql.Tuple{
  500. Emitter: "test",
  501. Message: xsql.Message{
  502. "class": "warrior",
  503. "equipment": map[string]interface{}{
  504. "rings": []interface{}{
  505. map[string]interface{}{
  506. "name": "ring of despair",
  507. "weight": 0.1,
  508. }, map[string]interface{}{
  509. "name": "ring of strength",
  510. "weight": 2.4,
  511. },
  512. },
  513. "arm_right": "Sword of flame",
  514. "arm_left": "Shield of faith",
  515. },
  516. },
  517. },
  518. result: []map[string]interface{}{{
  519. "a": []interface{}{
  520. map[string]interface{}{
  521. "name": "ring of strength",
  522. "weight": 2.4,
  523. },
  524. },
  525. }},
  526. }, {
  527. sql: `SELECT json_path_query(equipment, "$.rings[? @.weight>1].name") AS a FROM test`,
  528. data: &xsql.Tuple{
  529. Emitter: "test",
  530. Message: xsql.Message{
  531. "class": "warrior",
  532. "equipment": map[string]interface{}{
  533. "rings": []interface{}{
  534. map[string]interface{}{
  535. "name": "ring of despair",
  536. "weight": 0.1,
  537. }, map[string]interface{}{
  538. "name": "ring of strength",
  539. "weight": 2.4,
  540. },
  541. },
  542. "arm_right": "Sword of flame",
  543. "arm_left": "Shield of faith",
  544. },
  545. },
  546. },
  547. result: []map[string]interface{}{{
  548. "a": []interface{}{
  549. "ring of strength",
  550. },
  551. }},
  552. }, {
  553. sql: `SELECT json_path_exists(equipment, "$.rings[? @.weight>5]") AS a FROM test`,
  554. data: &xsql.Tuple{
  555. Emitter: "test",
  556. Message: xsql.Message{
  557. "class": "warrior",
  558. "equipment": map[string]interface{}{
  559. "rings": []interface{}{
  560. map[string]interface{}{
  561. "name": "ring of despair",
  562. "weight": 0.1,
  563. }, map[string]interface{}{
  564. "name": "ring of strength",
  565. "weight": 2.4,
  566. },
  567. },
  568. "arm_right": "Sword of flame",
  569. "arm_left": "Shield of faith",
  570. },
  571. },
  572. },
  573. result: []map[string]interface{}{{
  574. "a": false,
  575. }},
  576. }, {
  577. sql: `SELECT json_path_exists(equipment, "$.ring1") AS a FROM test`,
  578. data: &xsql.Tuple{
  579. Emitter: "test",
  580. Message: xsql.Message{
  581. "class": "warrior",
  582. "equipment": map[string]interface{}{
  583. "rings": []interface{}{
  584. map[string]interface{}{
  585. "name": "ring of despair",
  586. "weight": 0.1,
  587. }, map[string]interface{}{
  588. "name": "ring of strength",
  589. "weight": 2.4,
  590. },
  591. },
  592. "arm_right": "Sword of flame",
  593. "arm_left": "Shield of faith",
  594. },
  595. },
  596. },
  597. result: []map[string]interface{}{{
  598. "a": false,
  599. }},
  600. }, {
  601. sql: `SELECT json_path_exists(equipment, "$.rings") AS a FROM test`,
  602. data: &xsql.Tuple{
  603. Emitter: "test",
  604. Message: xsql.Message{
  605. "class": "warrior",
  606. "equipment": map[string]interface{}{
  607. "rings": []interface{}{
  608. map[string]interface{}{
  609. "name": "ring of despair",
  610. "weight": 0.1,
  611. }, map[string]interface{}{
  612. "name": "ring of strength",
  613. "weight": 2.4,
  614. },
  615. },
  616. "arm_right": "Sword of flame",
  617. "arm_left": "Shield of faith",
  618. },
  619. },
  620. },
  621. result: []map[string]interface{}{{
  622. "a": true,
  623. }},
  624. }, {
  625. sql: `SELECT json_path_query(equipment, "$.rings[? (@.weight>1)].name") AS a FROM test`,
  626. data: &xsql.Tuple{
  627. Emitter: "test",
  628. Message: xsql.Message{
  629. "class": "warrior",
  630. "equipment": map[string]interface{}{
  631. "rings": []map[string]interface{}{
  632. {
  633. "name": "ring of despair",
  634. "weight": 0.1,
  635. }, {
  636. "name": "ring of strength",
  637. "weight": 2.4,
  638. },
  639. },
  640. "arm_right": "Sword of flame",
  641. "arm_left": "Shield of faith",
  642. },
  643. },
  644. },
  645. result: []map[string]interface{}{{
  646. "a": []interface{}{
  647. "ring of strength",
  648. },
  649. }},
  650. }, {
  651. sql: `SELECT json_path_query(equipment, "$.rings[*]") AS a FROM test`,
  652. data: &xsql.Tuple{
  653. Emitter: "test",
  654. Message: xsql.Message{
  655. "class": "warrior",
  656. "equipment": map[string]interface{}{
  657. "rings": []float64{
  658. 0.1, 2.4,
  659. },
  660. "arm_right": "Sword of flame",
  661. "arm_left": "Shield of faith",
  662. },
  663. },
  664. },
  665. result: []map[string]interface{}{{
  666. "a": []interface{}{
  667. 0.1, 2.4,
  668. },
  669. }},
  670. }, {
  671. sql: `SELECT json_path_query(equipment, "$.rings") AS a FROM test`,
  672. data: &xsql.Tuple{
  673. Emitter: "test",
  674. Message: xsql.Message{
  675. "class": "warrior",
  676. "equipment": map[string]interface{}{
  677. "rings": []float64{
  678. 0.1, 2.4,
  679. },
  680. "arm_right": "Sword of flame",
  681. "arm_left": "Shield of faith",
  682. },
  683. },
  684. },
  685. result: []map[string]interface{}{{
  686. "a": []interface{}{
  687. 0.1, 2.4,
  688. },
  689. }},
  690. }, {
  691. sql: `SELECT json_path_query(equipment, "$[0].rings[1]") AS a FROM test`,
  692. data: &xsql.Tuple{
  693. Emitter: "test",
  694. Message: xsql.Message{
  695. "class": "warrior",
  696. "equipment": []map[string]interface{}{
  697. {
  698. "rings": []float64{
  699. 0.1, 2.4,
  700. },
  701. "arm_right": "Sword of flame",
  702. "arm_left": "Shield of faith",
  703. },
  704. },
  705. },
  706. },
  707. result: []map[string]interface{}{{
  708. "a": 2.4,
  709. }},
  710. }, {
  711. sql: "SELECT json_path_query(equipment, \"$[0][\\\"arm.left\\\"]\") AS a FROM test",
  712. data: &xsql.Tuple{
  713. Emitter: "test",
  714. Message: xsql.Message{
  715. "class": "warrior",
  716. "equipment": []map[string]interface{}{
  717. {
  718. "rings": []float64{
  719. 0.1, 2.4,
  720. },
  721. "arm.right": "Sword of flame",
  722. "arm.left": "Shield of faith",
  723. },
  724. },
  725. },
  726. },
  727. result: []map[string]interface{}{{
  728. "a": "Shield of faith",
  729. }},
  730. }, {
  731. sql: "SELECT json_path_query(equipment, \"$[\\\"arm.left\\\"]\") AS a FROM test",
  732. data: &xsql.Tuple{
  733. Emitter: "test",
  734. Message: xsql.Message{
  735. "class": "warrior",
  736. "equipment": `{"rings": [0.1, 2.4],"arm.right": "Sword of flame","arm.left": "Shield of faith"}`,
  737. },
  738. },
  739. result: []map[string]interface{}{{
  740. "a": "Shield of faith",
  741. }},
  742. }, {
  743. sql: "SELECT json_path_query(equipment, \"$[0][\\\"arm.left\\\"]\") AS a FROM test",
  744. data: &xsql.Tuple{
  745. Emitter: "test",
  746. Message: xsql.Message{
  747. "class": "warrior",
  748. "equipment": `[{"rings": [0.1, 2.4],"arm.right": "Sword of flame","arm.left": "Shield of faith"}]`,
  749. },
  750. },
  751. result: []map[string]interface{}{{
  752. "a": "Shield of faith",
  753. }},
  754. }, {
  755. sql: `SELECT all[poi[-1] + 1]->ts as powerOnTs FROM test`,
  756. data: &xsql.Tuple{
  757. Emitter: "test",
  758. Message: xsql.Message{
  759. "all": []map[string]interface{}{
  760. {"SystemPowerMode": 0, "VehicleSpeed": 0, "FLWdwPosition": 0, "FrontWiperSwitchStatus": float64(1), "ts": 0},
  761. {"SystemPowerMode": 0, "VehicleSpeed": 0, "FLWdwPosition": 0, "FrontWiperSwitchStatus": float64(4), "ts": 500},
  762. {"SystemPowerMode": 2, "VehicleSpeed": 0, "FLWdwPosition": 0, "FrontWiperSwitchStatus": 0, "ts": 1000},
  763. {"SystemPowerMode": 2, "VehicleSpeed": 10, "FLWdwPosition": 20, "FrontWiperSwitchStatus": 0, "ts": 60000},
  764. {"SystemPowerMode": 2, "VehicleSpeed": 10, "FLWdwPosition": 20, "FrontWiperSwitchStatus": 0, "ts": 89500},
  765. {"SystemPowerMode": 2, "VehicleSpeed": 20, "FLWdwPosition": 50, "FrontWiperSwitchStatus": 5, "ts": 90000},
  766. {"SystemPowerMode": 2, "VehicleSpeed": 40, "FLWdwPosition": 60, "FrontWiperSwitchStatus": 5, "ts": 121000},
  767. },
  768. "poi": []interface{}{0, 1},
  769. },
  770. },
  771. result: []map[string]interface{}{{
  772. "powerOnTs": 1000,
  773. }},
  774. }, {
  775. sql: `SELECT json_path_query(equipment, "$.arm_right") AS a FROM test`,
  776. data: &xsql.Tuple{
  777. Emitter: "test",
  778. Message: xsql.Message{
  779. "class": "warrior",
  780. "equipment2": map[string]interface{}{
  781. "rings": []map[string]interface{}{
  782. {
  783. "name": "ring of despair",
  784. "weight": 0.1,
  785. }, {
  786. "name": "ring of strength",
  787. "weight": 2.4,
  788. },
  789. },
  790. "arm_right": "Sword of flame",
  791. "arm_left": "Shield of faith",
  792. },
  793. },
  794. },
  795. err: "run Select error: call func json_path_query error: invalid data nil for jsonpath",
  796. },
  797. }
  798. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  799. contextLogger := conf.Log.WithField("rule", "TestJsonFunc_Apply1")
  800. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  801. for i, tt := range tests {
  802. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  803. if err != nil || stmt == nil {
  804. t.Errorf("parse sql %s error %v", tt.sql, err)
  805. }
  806. pp := &ProjectOp{}
  807. parseStmt(pp, stmt.Fields)
  808. fv, afv := xsql.NewFunctionValuersForOp(ctx)
  809. opResult := pp.Apply(ctx, tt.data, fv, afv)
  810. if rt, ok := opResult.(error); ok {
  811. if tt.err == "" {
  812. t.Errorf("%d: got error:\n exp=%s\n got=%s\n\n", i, tt.result, rt)
  813. } else if !reflect.DeepEqual(tt.err, testx.Errstring(rt)) {
  814. t.Errorf("%d: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.err, rt)
  815. }
  816. } else {
  817. result, _ := parseResult(opResult, pp.IsAggregate)
  818. if tt.err == "" {
  819. if !reflect.DeepEqual(tt.result, result) {
  820. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  821. }
  822. } else {
  823. t.Errorf("%d: invalid result:\n exp error %s\n got=%s\n\n", i, tt.err, result)
  824. }
  825. }
  826. }
  827. }
  828. func TestChangedFuncs_Apply1(t *testing.T) {
  829. var tests = []struct {
  830. sql string
  831. data []interface{}
  832. result [][]map[string]interface{}
  833. }{
  834. {
  835. sql: `SELECT changed_col(true, a), b FROM test`,
  836. data: []interface{}{
  837. &xsql.Tuple{
  838. Emitter: "test",
  839. Message: xsql.Message{
  840. "a": "a1",
  841. "b": "b1",
  842. "c": "c1",
  843. },
  844. },
  845. &xsql.Tuple{
  846. Emitter: "test",
  847. Message: xsql.Message{
  848. "a": "a1",
  849. "b": "b2",
  850. "c": "c1",
  851. },
  852. },
  853. &xsql.Tuple{
  854. Emitter: "test",
  855. Message: xsql.Message{
  856. "a": "a1",
  857. "c": "c1",
  858. },
  859. },
  860. &xsql.Tuple{
  861. Emitter: "test",
  862. Message: xsql.Message{
  863. "a": "a1",
  864. "b": "b2",
  865. "c": "c2",
  866. },
  867. },
  868. },
  869. result: [][]map[string]interface{}{{{
  870. "changed_col": "a1",
  871. "b": "b1",
  872. }}, {{
  873. "b": "b2",
  874. }}, {{}}, {{
  875. "b": "b2",
  876. }}},
  877. }, {
  878. sql: `SELECT changed_col(true, *) FROM test`,
  879. data: []interface{}{
  880. &xsql.Tuple{
  881. Emitter: "test",
  882. Message: xsql.Message{
  883. "a": "a1",
  884. "b": "b1",
  885. },
  886. },
  887. &xsql.Tuple{
  888. Emitter: "test",
  889. Message: xsql.Message{
  890. "a": "a1",
  891. "c": "c1",
  892. },
  893. },
  894. &xsql.Tuple{
  895. Emitter: "test",
  896. Message: xsql.Message{
  897. "a": "a1",
  898. "c": "c1",
  899. },
  900. },
  901. &xsql.Tuple{
  902. Emitter: "test",
  903. Message: xsql.Message{
  904. "a": "a1",
  905. "b": "b2",
  906. "c": "c2",
  907. },
  908. },
  909. },
  910. result: [][]map[string]interface{}{{{
  911. "changed_col": xsql.Message{
  912. "a": "a1",
  913. "b": "b1",
  914. },
  915. }}, {{
  916. "changed_col": xsql.Message{
  917. "a": "a1",
  918. "c": "c1",
  919. },
  920. }}, {{}}, {{
  921. "changed_col": xsql.Message{
  922. "a": "a1",
  923. "b": "b2",
  924. "c": "c2",
  925. },
  926. }}},
  927. },
  928. }
  929. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  930. contextLogger := conf.Log.WithField("rule", "TestChangedFuncs_Apply1")
  931. for i, tt := range tests {
  932. tempStore, _ := state.CreateStore("mockRule"+strconv.Itoa(i), api.AtMostOnce)
  933. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta("mockRule"+strconv.Itoa(i), "project", tempStore)
  934. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  935. if err != nil || stmt == nil {
  936. t.Errorf("parse sql %s error %v", tt.sql, err)
  937. }
  938. pp := &ProjectOp{}
  939. parseStmt(pp, stmt.Fields)
  940. fv, afv := xsql.NewFunctionValuersForOp(ctx)
  941. r := make([][]map[string]interface{}, 0, len(tt.data))
  942. for _, d := range tt.data {
  943. opResult := pp.Apply(ctx, d, fv, afv)
  944. result, err := parseResult(opResult, pp.IsAggregate)
  945. if err != nil {
  946. t.Errorf("parse result error: %s", err)
  947. continue
  948. }
  949. r = append(r, result)
  950. }
  951. if !reflect.DeepEqual(tt.result, r) {
  952. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, r)
  953. }
  954. }
  955. }
  956. func TestLagFuncs_Apply1(t *testing.T) {
  957. var tests = []struct {
  958. sql string
  959. data []interface{}
  960. result [][]map[string]interface{}
  961. }{
  962. {
  963. sql: `SELECT lag(a) as a, lag(b) as b FROM test`,
  964. data: []interface{}{
  965. &xsql.Tuple{
  966. Emitter: "test",
  967. Message: xsql.Message{
  968. "a": "a1",
  969. "b": "b1",
  970. "c": "c1",
  971. },
  972. },
  973. &xsql.Tuple{
  974. Emitter: "test",
  975. Message: xsql.Message{
  976. "a": "a1",
  977. "b": "b2",
  978. "c": "c1",
  979. },
  980. },
  981. &xsql.Tuple{
  982. Emitter: "test",
  983. Message: xsql.Message{
  984. "a": "a1",
  985. "c": "c1",
  986. },
  987. },
  988. &xsql.Tuple{
  989. Emitter: "test",
  990. Message: xsql.Message{
  991. "a": "a1",
  992. "b": "b2",
  993. "c": "c2",
  994. },
  995. },
  996. },
  997. result: [][]map[string]interface{}{{{}}, {{
  998. "a": "a1",
  999. "b": "b1",
  1000. }}, {{
  1001. "a": "a1",
  1002. "b": "b2",
  1003. }}, {{
  1004. "a": "a1",
  1005. }}},
  1006. },
  1007. {
  1008. sql: `SELECT lag(a, 2, "a10") as a FROM test`,
  1009. data: []interface{}{
  1010. &xsql.Tuple{
  1011. Emitter: "test",
  1012. Message: xsql.Message{
  1013. "a": "a1",
  1014. "b": "b1",
  1015. },
  1016. },
  1017. &xsql.Tuple{
  1018. Emitter: "test",
  1019. Message: xsql.Message{
  1020. "a": "a2",
  1021. "c": "c1",
  1022. },
  1023. },
  1024. &xsql.Tuple{
  1025. Emitter: "test",
  1026. Message: xsql.Message{
  1027. "a": "a1",
  1028. "c": "c1",
  1029. },
  1030. },
  1031. &xsql.Tuple{
  1032. Emitter: "test",
  1033. Message: xsql.Message{
  1034. "a": "a1",
  1035. "b": "b2",
  1036. "c": "c2",
  1037. },
  1038. },
  1039. },
  1040. result: [][]map[string]interface{}{{{
  1041. "a": "a10",
  1042. }}, {{
  1043. "a": "a10",
  1044. }}, {{
  1045. "a": "a1",
  1046. }}, {{
  1047. "a": "a2",
  1048. }}},
  1049. },
  1050. {
  1051. sql: `SELECT lag(a, 2) as a FROM test`,
  1052. data: []interface{}{
  1053. &xsql.Tuple{
  1054. Emitter: "test",
  1055. Message: xsql.Message{
  1056. "a": "a1",
  1057. "b": "b1",
  1058. },
  1059. },
  1060. &xsql.Tuple{
  1061. Emitter: "test",
  1062. Message: xsql.Message{
  1063. "a": "a2",
  1064. "c": "c1",
  1065. },
  1066. },
  1067. &xsql.Tuple{
  1068. Emitter: "test",
  1069. Message: xsql.Message{
  1070. "a": "a1",
  1071. "c": "c1",
  1072. },
  1073. },
  1074. &xsql.Tuple{
  1075. Emitter: "test",
  1076. Message: xsql.Message{
  1077. "a": "a1",
  1078. "b": "b2",
  1079. "c": "c2",
  1080. },
  1081. },
  1082. },
  1083. result: [][]map[string]interface{}{{{}}, {{}}, {{
  1084. "a": "a1",
  1085. }}, {{
  1086. "a": "a2",
  1087. }}},
  1088. },
  1089. }
  1090. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1091. contextLogger := conf.Log.WithField("rule", "TestChangedFuncs_Apply1")
  1092. for i, tt := range tests {
  1093. tempStore, _ := state.CreateStore("mockRule"+strconv.Itoa(i), api.AtMostOnce)
  1094. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta("mockRule"+strconv.Itoa(i), "project", tempStore)
  1095. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1096. if err != nil || stmt == nil {
  1097. t.Errorf("parse sql %s error %v", tt.sql, err)
  1098. }
  1099. pp := &ProjectOp{}
  1100. parseStmt(pp, stmt.Fields)
  1101. fv, afv := xsql.NewFunctionValuersForOp(ctx)
  1102. r := make([][]map[string]interface{}, 0, len(tt.data))
  1103. for _, d := range tt.data {
  1104. opResult := pp.Apply(ctx, d, fv, afv)
  1105. result, err := parseResult(opResult, pp.IsAggregate)
  1106. if err != nil {
  1107. t.Errorf("parse result error: %s", err)
  1108. continue
  1109. }
  1110. r = append(r, result)
  1111. }
  1112. if !reflect.DeepEqual(tt.result, r) {
  1113. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, r)
  1114. }
  1115. }
  1116. }