misc_func_test.go 28 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "fmt"
  17. "reflect"
  18. "strconv"
  19. "strings"
  20. "testing"
  21. "github.com/lf-edge/ekuiper/internal/conf"
  22. "github.com/lf-edge/ekuiper/internal/testx"
  23. "github.com/lf-edge/ekuiper/internal/topo/context"
  24. "github.com/lf-edge/ekuiper/internal/topo/state"
  25. "github.com/lf-edge/ekuiper/internal/xsql"
  26. "github.com/lf-edge/ekuiper/pkg/api"
  27. "github.com/lf-edge/ekuiper/pkg/cast"
  28. )
  29. func TestMiscFunc_Apply1(t *testing.T) {
  30. tests := []struct {
  31. sql string
  32. data *xsql.Tuple
  33. result []map[string]interface{}
  34. }{
  35. {
  36. sql: "SELECT md5(a) AS a FROM test",
  37. data: &xsql.Tuple{
  38. Emitter: "test",
  39. Message: xsql.Message{
  40. "a": "The quick brown fox jumps over the lazy dog",
  41. "b": "myb",
  42. "c": "myc",
  43. },
  44. },
  45. result: []map[string]interface{}{{
  46. "a": strings.ToLower("9E107D9D372BB6826BD81D3542A419D6"),
  47. }},
  48. },
  49. {
  50. sql: "SELECT md5(d) AS a FROM test",
  51. data: &xsql.Tuple{
  52. Emitter: "test",
  53. Message: xsql.Message{
  54. "a": "The quick brown fox jumps over the lazy dog",
  55. "b": "myb",
  56. "c": "myc",
  57. },
  58. },
  59. result: []map[string]interface{}{{}},
  60. },
  61. {
  62. sql: "SELECT sha1(a) AS a FROM test",
  63. data: &xsql.Tuple{
  64. Emitter: "test",
  65. Message: xsql.Message{
  66. "a": "The quick brown fox jumps over the lazy dog",
  67. "b": "myb",
  68. "c": "myc",
  69. },
  70. },
  71. result: []map[string]interface{}{{
  72. "a": strings.ToLower("2FD4E1C67A2D28FCED849EE1BB76E7391B93EB12"),
  73. }},
  74. },
  75. {
  76. sql: "SELECT sha256(a) AS a FROM test",
  77. data: &xsql.Tuple{
  78. Emitter: "test",
  79. Message: xsql.Message{
  80. "a": "The quick brown fox jumps over the lazy dog",
  81. "b": "myb",
  82. "c": "myc",
  83. },
  84. },
  85. result: []map[string]interface{}{{
  86. "a": strings.ToLower("D7A8FBB307D7809469CA9ABCB0082E4F8D5651E46D3CDB762D02D0BF37C9E592"),
  87. }},
  88. },
  89. {
  90. sql: "SELECT sha384(a) AS a FROM test",
  91. data: &xsql.Tuple{
  92. Emitter: "test",
  93. Message: xsql.Message{
  94. "a": "The quick brown fox jumps over the lazy dog",
  95. "b": "myb",
  96. "c": "myc",
  97. },
  98. },
  99. result: []map[string]interface{}{{
  100. "a": strings.ToLower("CA737F1014A48F4C0B6DD43CB177B0AFD9E5169367544C494011E3317DBF9A509CB1E5DC1E85A941BBEE3D7F2AFBC9B1"),
  101. }},
  102. },
  103. {
  104. sql: "SELECT sha512(a) AS a FROM test",
  105. data: &xsql.Tuple{
  106. Emitter: "test",
  107. Message: xsql.Message{
  108. "a": "The quick brown fox jumps over the lazy dog",
  109. "b": "myb",
  110. "c": "myc",
  111. },
  112. },
  113. result: []map[string]interface{}{{
  114. "a": strings.ToLower("07E547D9586F6A73F73FBAC0435ED76951218FB7D0C8D788A309D785436BBB642E93A252A954F23912547D1E8A3B5ED6E1BFD7097821233FA0538F3DB854FEE6"),
  115. }},
  116. },
  117. {
  118. sql: "SELECT mqtt(topic) AS a FROM test",
  119. data: &xsql.Tuple{
  120. Emitter: "test",
  121. Message: xsql.Message{},
  122. Metadata: xsql.Metadata{
  123. "topic": "devices/device_001/message",
  124. },
  125. },
  126. result: []map[string]interface{}{{
  127. "a": "devices/device_001/message",
  128. }},
  129. },
  130. {
  131. sql: "SELECT mqtt(topic) AS a FROM test",
  132. data: &xsql.Tuple{
  133. Emitter: "test",
  134. Message: xsql.Message{},
  135. Metadata: xsql.Metadata{
  136. "topic": "devices/device_001/message",
  137. },
  138. },
  139. result: []map[string]interface{}{{
  140. "a": "devices/device_001/message",
  141. }},
  142. },
  143. {
  144. sql: "SELECT topic, mqtt(topic) AS a FROM test",
  145. data: &xsql.Tuple{
  146. Emitter: "test",
  147. Message: xsql.Message{
  148. "topic": "fff",
  149. },
  150. Metadata: xsql.Metadata{
  151. "topic": "devices/device_001/message",
  152. },
  153. },
  154. result: []map[string]interface{}{{
  155. "topic": "fff",
  156. "a": "devices/device_001/message",
  157. }},
  158. },
  159. {
  160. sql: "SELECT cardinality(arr) as r FROM test",
  161. data: &xsql.Tuple{
  162. Emitter: "test",
  163. Message: xsql.Message{
  164. "temperature": 43.2,
  165. "arr": []int{},
  166. },
  167. },
  168. result: []map[string]interface{}{{
  169. "r": 0,
  170. }},
  171. },
  172. {
  173. sql: "SELECT cardinality(arr) as r FROM test",
  174. data: &xsql.Tuple{
  175. Emitter: "test",
  176. Message: xsql.Message{
  177. "temperature": 43.2,
  178. "arr": []int{1, 2, 3, 4, 5},
  179. },
  180. },
  181. result: []map[string]interface{}{{
  182. "r": 5,
  183. }},
  184. },
  185. {
  186. sql: "SELECT isNull(arr) as r FROM test",
  187. data: &xsql.Tuple{
  188. Emitter: "test",
  189. Message: xsql.Message{
  190. "temperature": 43.2,
  191. "arr": []int{},
  192. },
  193. },
  194. result: []map[string]interface{}{{
  195. "r": false,
  196. }},
  197. },
  198. {
  199. sql: "SELECT isNull(arr) as r FROM test",
  200. data: &xsql.Tuple{
  201. Emitter: "test",
  202. Message: xsql.Message{
  203. "temperature": 43.2,
  204. "arr": []float64(nil),
  205. },
  206. },
  207. result: []map[string]interface{}{{
  208. "r": true,
  209. }},
  210. },
  211. {
  212. sql: "SELECT isNull(rec) as r FROM test",
  213. data: &xsql.Tuple{
  214. Emitter: "test",
  215. Message: xsql.Message{
  216. "temperature": 43.2,
  217. "rec": map[string]interface{}(nil),
  218. },
  219. },
  220. result: []map[string]interface{}{{
  221. "r": true,
  222. }},
  223. },
  224. {
  225. sql: "SELECT cast(a * 1000, \"datetime\") AS a FROM test",
  226. data: &xsql.Tuple{
  227. Emitter: "test",
  228. Message: xsql.Message{
  229. "a": 1.62000273e+09,
  230. "b": "ya",
  231. "c": "myc",
  232. },
  233. },
  234. result: []map[string]interface{}{{
  235. "a": cast.TimeFromUnixMilli(1.62000273e+12),
  236. }},
  237. },
  238. {
  239. sql: "SELECT rule_id() AS rule_id FROM test",
  240. data: &xsql.Tuple{
  241. Emitter: "test",
  242. Message: xsql.Message{},
  243. },
  244. result: []map[string]interface{}{{
  245. "rule_id": "rule0",
  246. }},
  247. },
  248. }
  249. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  250. contextLogger := conf.Log.WithField("rule", "TestMiscFunc_Apply1")
  251. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  252. ctx = ctx.WithMeta("rule0", "op1", &state.MemoryStore{}).(*context.DefaultContext)
  253. for i, tt := range tests {
  254. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  255. if err != nil || stmt == nil {
  256. t.Errorf("parse sql %s error %v", tt.sql, err)
  257. }
  258. pp := &ProjectOp{}
  259. parseStmt(pp, stmt.Fields)
  260. fv, afv := xsql.NewFunctionValuersForOp(ctx)
  261. opResult := pp.Apply(ctx, tt.data, fv, afv)
  262. result, err := parseResult(opResult, pp.IsAggregate)
  263. if err != nil {
  264. t.Errorf("parse result error: %s", err)
  265. continue
  266. }
  267. if !reflect.DeepEqual(tt.result, result) {
  268. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  269. }
  270. }
  271. }
  272. func TestMqttFunc_Apply2(t *testing.T) {
  273. tests := []struct {
  274. sql string
  275. data *xsql.JoinTuples
  276. result []map[string]interface{}
  277. }{
  278. {
  279. sql: "SELECT id1, mqtt(src1.topic) AS a, mqtt(src2.topic) as b FROM src1 LEFT JOIN src2 ON src1.id1 = src2.id1",
  280. data: &xsql.JoinTuples{
  281. Content: []*xsql.JoinTuple{
  282. {
  283. Tuples: []xsql.TupleRow{
  284. &xsql.Tuple{Emitter: "src1", Message: xsql.Message{"id1": "1", "f1": "v1"}, Metadata: xsql.Metadata{"topic": "devices/type1/device001"}},
  285. &xsql.Tuple{Emitter: "src2", Message: xsql.Message{"id2": "1", "f2": "w1"}, Metadata: xsql.Metadata{"topic": "devices/type2/device001"}},
  286. },
  287. },
  288. },
  289. },
  290. result: []map[string]interface{}{{
  291. "id1": "1",
  292. "a": "devices/type1/device001",
  293. "b": "devices/type2/device001",
  294. }},
  295. },
  296. }
  297. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  298. contextLogger := conf.Log.WithField("rule", "TestMqttFunc_Apply2")
  299. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  300. for i, tt := range tests {
  301. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  302. if err != nil || stmt == nil {
  303. t.Errorf("parse sql %s error %v", tt.sql, err)
  304. }
  305. pp := &ProjectOp{}
  306. parseStmt(pp, stmt.Fields)
  307. fv, afv := xsql.NewFunctionValuersForOp(nil)
  308. opResult := pp.Apply(ctx, tt.data, fv, afv)
  309. result, err := parseResult(opResult, pp.IsAggregate)
  310. if err != nil {
  311. t.Errorf("parse result error: %s", err)
  312. continue
  313. }
  314. if !reflect.DeepEqual(tt.result, result) {
  315. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  316. }
  317. }
  318. }
  319. func TestMetaFunc_Apply1(t *testing.T) {
  320. tests := []struct {
  321. sql string
  322. data interface{}
  323. result interface{}
  324. }{
  325. {
  326. sql: "SELECT topic, meta(topic) AS a FROM test",
  327. data: &xsql.Tuple{
  328. Emitter: "test",
  329. Message: xsql.Message{
  330. "topic": "fff",
  331. },
  332. Metadata: xsql.Metadata{
  333. "topic": "devices/device_001/message",
  334. },
  335. },
  336. result: []map[string]interface{}{{
  337. "topic": "fff",
  338. "a": "devices/device_001/message",
  339. }},
  340. },
  341. {
  342. sql: "SELECT meta(device) as d, meta(temperature->device) as r FROM test",
  343. data: &xsql.Tuple{
  344. Emitter: "test",
  345. Message: xsql.Message{
  346. "temperature": 43.2,
  347. },
  348. Metadata: xsql.Metadata{
  349. "temperature": map[string]interface{}{
  350. "id": "dfadfasfas",
  351. "device": "device2",
  352. },
  353. "device": "gateway",
  354. },
  355. },
  356. result: []map[string]interface{}{{
  357. "d": "gateway",
  358. "r": "device2",
  359. }},
  360. },
  361. {
  362. sql: "SELECT meta(*) as r FROM test",
  363. data: &xsql.Tuple{
  364. Emitter: "test",
  365. Message: xsql.Message{
  366. "temperature": 43.2,
  367. },
  368. Metadata: xsql.Metadata{
  369. "temperature": map[string]interface{}{
  370. "id": "dfadfasfas",
  371. "device": "device2",
  372. },
  373. "device": "gateway",
  374. },
  375. },
  376. result: []map[string]interface{}{{
  377. "r": map[string]interface{}{
  378. "temperature": map[string]interface{}{
  379. "id": "dfadfasfas",
  380. "device": "device2",
  381. },
  382. "device": "gateway",
  383. },
  384. }},
  385. },
  386. {
  387. sql: "SELECT topic, meta(`Light-diming`->device) AS a FROM test",
  388. data: &xsql.Tuple{
  389. Emitter: "test",
  390. Message: xsql.Message{
  391. "topic": "fff",
  392. },
  393. Metadata: xsql.Metadata{
  394. "Light-diming": map[string]interface{}{
  395. "device": "device2",
  396. },
  397. },
  398. },
  399. result: []map[string]interface{}{{
  400. "topic": "fff",
  401. "a": "device2",
  402. }},
  403. },
  404. }
  405. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  406. contextLogger := conf.Log.WithField("rule", "TestMetaFunc_Apply1")
  407. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  408. for i, tt := range tests {
  409. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  410. if err != nil || stmt == nil {
  411. t.Errorf("parse sql %s error %v", tt.sql, err)
  412. }
  413. pp := &ProjectOp{}
  414. parseStmt(pp, stmt.Fields)
  415. fv, afv := xsql.NewFunctionValuersForOp(nil)
  416. opResult := pp.Apply(ctx, tt.data, fv, afv)
  417. result, err := parseResult(opResult, pp.IsAggregate)
  418. if err != nil {
  419. t.Errorf("parse result error: %s", err)
  420. continue
  421. }
  422. if !reflect.DeepEqual(tt.result, result) {
  423. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  424. }
  425. }
  426. }
  427. func TestJsonPathFunc_Apply1(t *testing.T) {
  428. tests := []struct {
  429. sql string
  430. data interface{}
  431. result interface{}
  432. err string
  433. }{
  434. {
  435. sql: `SELECT json_path_query(equipment, "$.arm_right") AS a FROM test`,
  436. data: &xsql.Tuple{
  437. Emitter: "test",
  438. Message: xsql.Message{
  439. "class": "warrior",
  440. "equipment": map[string]interface{}{
  441. "rings": []map[string]interface{}{
  442. {
  443. "name": "ring of despair",
  444. "weight": 0.1,
  445. }, {
  446. "name": "ring of strength",
  447. "weight": 2.4,
  448. },
  449. },
  450. "arm_right": "Sword of flame",
  451. "arm_left": "Shield of faith",
  452. },
  453. },
  454. },
  455. result: []map[string]interface{}{{
  456. "a": "Sword of flame",
  457. }},
  458. }, {
  459. sql: `SELECT json_path_query(equipment, "$.rings[*].weight") AS a FROM test`,
  460. data: &xsql.Tuple{
  461. Emitter: "test",
  462. Message: xsql.Message{
  463. "class": "warrior",
  464. "equipment": map[string]interface{}{
  465. "rings": []interface{}{
  466. map[string]interface{}{
  467. "name": "ring of despair",
  468. "weight": 0.1,
  469. }, map[string]interface{}{
  470. "name": "ring of strength",
  471. "weight": 2.4,
  472. },
  473. },
  474. "arm_right": "Sword of flame",
  475. "arm_left": "Shield of faith",
  476. },
  477. },
  478. },
  479. result: []map[string]interface{}{{
  480. "a": []interface{}{
  481. 0.1, 2.4,
  482. },
  483. }},
  484. }, {
  485. sql: `SELECT json_path_query_first(equipment, "$.rings[*].weight") AS a FROM test`,
  486. data: &xsql.Tuple{
  487. Emitter: "test",
  488. Message: xsql.Message{
  489. "class": "warrior",
  490. "equipment": map[string]interface{}{
  491. "rings": []interface{}{
  492. map[string]interface{}{
  493. "name": "ring of despair",
  494. "weight": 0.1,
  495. }, map[string]interface{}{
  496. "name": "ring of strength",
  497. "weight": 2.4,
  498. },
  499. },
  500. "arm_right": "Sword of flame",
  501. "arm_left": "Shield of faith",
  502. },
  503. },
  504. },
  505. result: []map[string]interface{}{{
  506. "a": 0.1,
  507. }},
  508. }, {
  509. sql: `SELECT json_path_query(equipment, "$.rings[? @.weight>1]") AS a FROM test`,
  510. data: &xsql.Tuple{
  511. Emitter: "test",
  512. Message: xsql.Message{
  513. "class": "warrior",
  514. "equipment": map[string]interface{}{
  515. "rings": []interface{}{
  516. map[string]interface{}{
  517. "name": "ring of despair",
  518. "weight": 0.1,
  519. }, map[string]interface{}{
  520. "name": "ring of strength",
  521. "weight": 2.4,
  522. },
  523. },
  524. "arm_right": "Sword of flame",
  525. "arm_left": "Shield of faith",
  526. },
  527. },
  528. },
  529. result: []map[string]interface{}{{
  530. "a": []interface{}{
  531. map[string]interface{}{
  532. "name": "ring of strength",
  533. "weight": 2.4,
  534. },
  535. },
  536. }},
  537. }, {
  538. sql: `SELECT json_path_query(equipment, "$.rings[? @.weight>1].name") AS a FROM test`,
  539. data: &xsql.Tuple{
  540. Emitter: "test",
  541. Message: xsql.Message{
  542. "class": "warrior",
  543. "equipment": map[string]interface{}{
  544. "rings": []interface{}{
  545. map[string]interface{}{
  546. "name": "ring of despair",
  547. "weight": 0.1,
  548. }, map[string]interface{}{
  549. "name": "ring of strength",
  550. "weight": 2.4,
  551. },
  552. },
  553. "arm_right": "Sword of flame",
  554. "arm_left": "Shield of faith",
  555. },
  556. },
  557. },
  558. result: []map[string]interface{}{{
  559. "a": []interface{}{
  560. "ring of strength",
  561. },
  562. }},
  563. }, {
  564. sql: `SELECT json_path_exists(equipment, "$.rings[? @.weight>5]") AS a FROM test`,
  565. data: &xsql.Tuple{
  566. Emitter: "test",
  567. Message: xsql.Message{
  568. "class": "warrior",
  569. "equipment": map[string]interface{}{
  570. "rings": []interface{}{
  571. map[string]interface{}{
  572. "name": "ring of despair",
  573. "weight": 0.1,
  574. }, map[string]interface{}{
  575. "name": "ring of strength",
  576. "weight": 2.4,
  577. },
  578. },
  579. "arm_right": "Sword of flame",
  580. "arm_left": "Shield of faith",
  581. },
  582. },
  583. },
  584. result: []map[string]interface{}{{
  585. "a": false,
  586. }},
  587. }, {
  588. sql: `SELECT json_path_exists(equipment, "$.ring1") AS a FROM test`,
  589. data: &xsql.Tuple{
  590. Emitter: "test",
  591. Message: xsql.Message{
  592. "class": "warrior",
  593. "equipment": map[string]interface{}{
  594. "rings": []interface{}{
  595. map[string]interface{}{
  596. "name": "ring of despair",
  597. "weight": 0.1,
  598. }, map[string]interface{}{
  599. "name": "ring of strength",
  600. "weight": 2.4,
  601. },
  602. },
  603. "arm_right": "Sword of flame",
  604. "arm_left": "Shield of faith",
  605. },
  606. },
  607. },
  608. result: []map[string]interface{}{{
  609. "a": false,
  610. }},
  611. }, {
  612. sql: `SELECT json_path_exists(equipment, "$.rings") AS a FROM test`,
  613. data: &xsql.Tuple{
  614. Emitter: "test",
  615. Message: xsql.Message{
  616. "class": "warrior",
  617. "equipment": map[string]interface{}{
  618. "rings": []interface{}{
  619. map[string]interface{}{
  620. "name": "ring of despair",
  621. "weight": 0.1,
  622. }, map[string]interface{}{
  623. "name": "ring of strength",
  624. "weight": 2.4,
  625. },
  626. },
  627. "arm_right": "Sword of flame",
  628. "arm_left": "Shield of faith",
  629. },
  630. },
  631. },
  632. result: []map[string]interface{}{{
  633. "a": true,
  634. }},
  635. }, {
  636. sql: `SELECT json_path_query(equipment, "$.rings[? (@.weight>1)].name") AS a FROM test`,
  637. data: &xsql.Tuple{
  638. Emitter: "test",
  639. Message: xsql.Message{
  640. "class": "warrior",
  641. "equipment": map[string]interface{}{
  642. "rings": []map[string]interface{}{
  643. {
  644. "name": "ring of despair",
  645. "weight": 0.1,
  646. }, {
  647. "name": "ring of strength",
  648. "weight": 2.4,
  649. },
  650. },
  651. "arm_right": "Sword of flame",
  652. "arm_left": "Shield of faith",
  653. },
  654. },
  655. },
  656. result: []map[string]interface{}{{
  657. "a": []interface{}{
  658. "ring of strength",
  659. },
  660. }},
  661. }, {
  662. sql: `SELECT json_path_query(equipment, "$.rings[*]") AS a FROM test`,
  663. data: &xsql.Tuple{
  664. Emitter: "test",
  665. Message: xsql.Message{
  666. "class": "warrior",
  667. "equipment": map[string]interface{}{
  668. "rings": []float64{
  669. 0.1, 2.4,
  670. },
  671. "arm_right": "Sword of flame",
  672. "arm_left": "Shield of faith",
  673. },
  674. },
  675. },
  676. result: []map[string]interface{}{{
  677. "a": []interface{}{
  678. 0.1, 2.4,
  679. },
  680. }},
  681. }, {
  682. sql: `SELECT json_path_query(equipment, "$.rings") AS a FROM test`,
  683. data: &xsql.Tuple{
  684. Emitter: "test",
  685. Message: xsql.Message{
  686. "class": "warrior",
  687. "equipment": map[string]interface{}{
  688. "rings": []float64{
  689. 0.1, 2.4,
  690. },
  691. "arm_right": "Sword of flame",
  692. "arm_left": "Shield of faith",
  693. },
  694. },
  695. },
  696. result: []map[string]interface{}{{
  697. "a": []interface{}{
  698. 0.1, 2.4,
  699. },
  700. }},
  701. }, {
  702. sql: `SELECT json_path_query(equipment, "$[0].rings[1]") AS a FROM test`,
  703. data: &xsql.Tuple{
  704. Emitter: "test",
  705. Message: xsql.Message{
  706. "class": "warrior",
  707. "equipment": []map[string]interface{}{
  708. {
  709. "rings": []float64{
  710. 0.1, 2.4,
  711. },
  712. "arm_right": "Sword of flame",
  713. "arm_left": "Shield of faith",
  714. },
  715. },
  716. },
  717. },
  718. result: []map[string]interface{}{{
  719. "a": 2.4,
  720. }},
  721. }, {
  722. sql: "SELECT json_path_query(equipment, \"$[0][\\\"arm.left\\\"]\") AS a FROM test",
  723. data: &xsql.Tuple{
  724. Emitter: "test",
  725. Message: xsql.Message{
  726. "class": "warrior",
  727. "equipment": []map[string]interface{}{
  728. {
  729. "rings": []float64{
  730. 0.1, 2.4,
  731. },
  732. "arm.right": "Sword of flame",
  733. "arm.left": "Shield of faith",
  734. },
  735. },
  736. },
  737. },
  738. result: []map[string]interface{}{{
  739. "a": "Shield of faith",
  740. }},
  741. }, {
  742. sql: "SELECT json_path_query(equipment, \"$[\\\"arm.left\\\"]\") AS a FROM test",
  743. data: &xsql.Tuple{
  744. Emitter: "test",
  745. Message: xsql.Message{
  746. "class": "warrior",
  747. "equipment": `{"rings": [0.1, 2.4],"arm.right": "Sword of flame","arm.left": "Shield of faith"}`,
  748. },
  749. },
  750. result: []map[string]interface{}{{
  751. "a": "Shield of faith",
  752. }},
  753. }, {
  754. sql: "SELECT json_path_query(equipment, \"$[0][\\\"arm.left\\\"]\") AS a FROM test",
  755. data: &xsql.Tuple{
  756. Emitter: "test",
  757. Message: xsql.Message{
  758. "class": "warrior",
  759. "equipment": `[{"rings": [0.1, 2.4],"arm.right": "Sword of flame","arm.left": "Shield of faith"}]`,
  760. },
  761. },
  762. result: []map[string]interface{}{{
  763. "a": "Shield of faith",
  764. }},
  765. }, {
  766. sql: `SELECT all[poi[-1] + 1]->ts as powerOnTs FROM test`,
  767. data: &xsql.Tuple{
  768. Emitter: "test",
  769. Message: xsql.Message{
  770. "all": []map[string]interface{}{
  771. {"SystemPowerMode": 0, "VehicleSpeed": 0, "FLWdwPosition": 0, "FrontWiperSwitchStatus": float64(1), "ts": 0},
  772. {"SystemPowerMode": 0, "VehicleSpeed": 0, "FLWdwPosition": 0, "FrontWiperSwitchStatus": float64(4), "ts": 500},
  773. {"SystemPowerMode": 2, "VehicleSpeed": 0, "FLWdwPosition": 0, "FrontWiperSwitchStatus": 0, "ts": 1000},
  774. {"SystemPowerMode": 2, "VehicleSpeed": 10, "FLWdwPosition": 20, "FrontWiperSwitchStatus": 0, "ts": 60000},
  775. {"SystemPowerMode": 2, "VehicleSpeed": 10, "FLWdwPosition": 20, "FrontWiperSwitchStatus": 0, "ts": 89500},
  776. {"SystemPowerMode": 2, "VehicleSpeed": 20, "FLWdwPosition": 50, "FrontWiperSwitchStatus": 5, "ts": 90000},
  777. {"SystemPowerMode": 2, "VehicleSpeed": 40, "FLWdwPosition": 60, "FrontWiperSwitchStatus": 5, "ts": 121000},
  778. },
  779. "poi": []interface{}{0, 1},
  780. },
  781. },
  782. result: []map[string]interface{}{{
  783. "powerOnTs": 1000,
  784. }},
  785. }, {
  786. sql: `SELECT json_path_query(equipment, "$.arm_right") AS a FROM test`,
  787. data: &xsql.Tuple{
  788. Emitter: "test",
  789. Message: xsql.Message{
  790. "class": "warrior",
  791. "equipment2": map[string]interface{}{
  792. "rings": []map[string]interface{}{
  793. {
  794. "name": "ring of despair",
  795. "weight": 0.1,
  796. }, {
  797. "name": "ring of strength",
  798. "weight": 2.4,
  799. },
  800. },
  801. "arm_right": "Sword of flame",
  802. "arm_left": "Shield of faith",
  803. },
  804. },
  805. },
  806. err: "run Select error: call func json_path_query error: invalid data nil for jsonpath",
  807. },
  808. }
  809. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  810. contextLogger := conf.Log.WithField("rule", "TestJsonFunc_Apply1")
  811. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  812. for i, tt := range tests {
  813. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  814. if err != nil || stmt == nil {
  815. t.Errorf("parse sql %s error %v", tt.sql, err)
  816. }
  817. pp := &ProjectOp{}
  818. parseStmt(pp, stmt.Fields)
  819. fv, afv := xsql.NewFunctionValuersForOp(ctx)
  820. opResult := pp.Apply(ctx, tt.data, fv, afv)
  821. if rt, ok := opResult.(error); ok {
  822. if tt.err == "" {
  823. t.Errorf("%d: got error:\n exp=%s\n got=%s\n\n", i, tt.result, rt)
  824. } else if !reflect.DeepEqual(tt.err, testx.Errstring(rt)) {
  825. t.Errorf("%d: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.err, rt)
  826. }
  827. } else {
  828. result, _ := parseResult(opResult, pp.IsAggregate)
  829. if tt.err == "" {
  830. if !reflect.DeepEqual(tt.result, result) {
  831. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  832. }
  833. } else {
  834. t.Errorf("%d: invalid result:\n exp error %s\n got=%s\n\n", i, tt.err, result)
  835. }
  836. }
  837. }
  838. }
  839. func TestChangedFuncs_Apply1(t *testing.T) {
  840. tests := []struct {
  841. sql string
  842. data []interface{}
  843. result [][]map[string]interface{}
  844. }{
  845. {
  846. sql: `SELECT changed_col(true, a), b FROM test`,
  847. data: []interface{}{
  848. &xsql.Tuple{
  849. Emitter: "test",
  850. Message: xsql.Message{
  851. "a": "a1",
  852. "b": "b1",
  853. "c": "c1",
  854. },
  855. },
  856. &xsql.Tuple{
  857. Emitter: "test",
  858. Message: xsql.Message{
  859. "a": "a1",
  860. "b": "b2",
  861. "c": "c1",
  862. },
  863. },
  864. &xsql.Tuple{
  865. Emitter: "test",
  866. Message: xsql.Message{
  867. "a": "a1",
  868. "c": "c1",
  869. },
  870. },
  871. &xsql.Tuple{
  872. Emitter: "test",
  873. Message: xsql.Message{
  874. "a": "a1",
  875. "b": "b2",
  876. "c": "c2",
  877. },
  878. },
  879. },
  880. result: [][]map[string]interface{}{{{
  881. "changed_col": "a1",
  882. "b": "b1",
  883. }}, {{
  884. "b": "b2",
  885. }}, {{}}, {{
  886. "b": "b2",
  887. }}},
  888. }, {
  889. sql: `SELECT changed_col(true, *) FROM test`,
  890. data: []interface{}{
  891. &xsql.Tuple{
  892. Emitter: "test",
  893. Message: xsql.Message{
  894. "a": "a1",
  895. "b": "b1",
  896. },
  897. },
  898. &xsql.Tuple{
  899. Emitter: "test",
  900. Message: xsql.Message{
  901. "a": "a1",
  902. "c": "c1",
  903. },
  904. },
  905. &xsql.Tuple{
  906. Emitter: "test",
  907. Message: xsql.Message{
  908. "a": "a1",
  909. "c": "c1",
  910. },
  911. },
  912. &xsql.Tuple{
  913. Emitter: "test",
  914. Message: xsql.Message{
  915. "a": "a1",
  916. "b": "b2",
  917. "c": "c2",
  918. },
  919. },
  920. },
  921. result: [][]map[string]interface{}{{{
  922. "changed_col": map[string]interface{}{
  923. "a": "a1",
  924. "b": "b1",
  925. },
  926. }}, {{
  927. "changed_col": map[string]interface{}{
  928. "a": "a1",
  929. "c": "c1",
  930. },
  931. }}, {{}}, {{
  932. "changed_col": map[string]interface{}{
  933. "a": "a1",
  934. "b": "b2",
  935. "c": "c2",
  936. },
  937. }}},
  938. },
  939. }
  940. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  941. contextLogger := conf.Log.WithField("rule", "TestChangedFuncs_Apply1")
  942. for i, tt := range tests {
  943. tempStore, _ := state.CreateStore("mockRule"+strconv.Itoa(i), api.AtMostOnce)
  944. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta("mockRule"+strconv.Itoa(i), "project", tempStore)
  945. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  946. if err != nil || stmt == nil {
  947. t.Errorf("parse sql %s error %v", tt.sql, err)
  948. }
  949. pp := &ProjectOp{}
  950. parseStmt(pp, stmt.Fields)
  951. fv, afv := xsql.NewFunctionValuersForOp(ctx)
  952. r := make([][]map[string]interface{}, 0, len(tt.data))
  953. for _, d := range tt.data {
  954. opResult := pp.Apply(ctx, d, fv, afv)
  955. result, err := parseResult(opResult, pp.IsAggregate)
  956. if err != nil {
  957. t.Errorf("parse result error: %s", err)
  958. continue
  959. }
  960. r = append(r, result)
  961. }
  962. if !reflect.DeepEqual(tt.result, r) {
  963. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, r)
  964. }
  965. }
  966. }
  967. func TestLagFuncs_Apply1(t *testing.T) {
  968. tests := []struct {
  969. sql string
  970. data []interface{}
  971. result [][]map[string]interface{}
  972. }{
  973. {
  974. sql: `SELECT lag(a) as a, lag(b) as b FROM test`,
  975. data: []interface{}{
  976. &xsql.Tuple{
  977. Emitter: "test",
  978. Message: xsql.Message{
  979. "a": "a1",
  980. "b": "b1",
  981. "c": "c1",
  982. },
  983. },
  984. &xsql.Tuple{
  985. Emitter: "test",
  986. Message: xsql.Message{
  987. "a": "a1",
  988. "b": "b2",
  989. "c": "c1",
  990. },
  991. },
  992. &xsql.Tuple{
  993. Emitter: "test",
  994. Message: xsql.Message{
  995. "a": "a1",
  996. "c": "c1",
  997. },
  998. },
  999. &xsql.Tuple{
  1000. Emitter: "test",
  1001. Message: xsql.Message{
  1002. "a": "a1",
  1003. "b": "b2",
  1004. "c": "c2",
  1005. },
  1006. },
  1007. },
  1008. result: [][]map[string]interface{}{{{}}, {{
  1009. "a": "a1",
  1010. "b": "b1",
  1011. }}, {{
  1012. "a": "a1",
  1013. "b": "b2",
  1014. }}, {{
  1015. "a": "a1",
  1016. }}},
  1017. },
  1018. {
  1019. sql: `SELECT lag(a, 2, "a10") as a FROM test`,
  1020. data: []interface{}{
  1021. &xsql.Tuple{
  1022. Emitter: "test",
  1023. Message: xsql.Message{
  1024. "a": "a1",
  1025. "b": "b1",
  1026. },
  1027. },
  1028. &xsql.Tuple{
  1029. Emitter: "test",
  1030. Message: xsql.Message{
  1031. "a": "a2",
  1032. "c": "c1",
  1033. },
  1034. },
  1035. &xsql.Tuple{
  1036. Emitter: "test",
  1037. Message: xsql.Message{
  1038. "a": "a1",
  1039. "c": "c1",
  1040. },
  1041. },
  1042. &xsql.Tuple{
  1043. Emitter: "test",
  1044. Message: xsql.Message{
  1045. "a": "a1",
  1046. "b": "b2",
  1047. "c": "c2",
  1048. },
  1049. },
  1050. },
  1051. result: [][]map[string]interface{}{{{
  1052. "a": "a10",
  1053. }}, {{
  1054. "a": "a10",
  1055. }}, {{
  1056. "a": "a1",
  1057. }}, {{
  1058. "a": "a2",
  1059. }}},
  1060. },
  1061. {
  1062. sql: `SELECT lag(a, 2) as a FROM test`,
  1063. data: []interface{}{
  1064. &xsql.Tuple{
  1065. Emitter: "test",
  1066. Message: xsql.Message{
  1067. "a": "a1",
  1068. "b": "b1",
  1069. },
  1070. },
  1071. &xsql.Tuple{
  1072. Emitter: "test",
  1073. Message: xsql.Message{
  1074. "a": "a2",
  1075. "c": "c1",
  1076. },
  1077. },
  1078. &xsql.Tuple{
  1079. Emitter: "test",
  1080. Message: xsql.Message{
  1081. "a": "a1",
  1082. "c": "c1",
  1083. },
  1084. },
  1085. &xsql.Tuple{
  1086. Emitter: "test",
  1087. Message: xsql.Message{
  1088. "a": "a1",
  1089. "b": "b2",
  1090. "c": "c2",
  1091. },
  1092. },
  1093. },
  1094. result: [][]map[string]interface{}{{{}}, {{}}, {{
  1095. "a": "a1",
  1096. }}, {{
  1097. "a": "a2",
  1098. }}},
  1099. },
  1100. }
  1101. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1102. contextLogger := conf.Log.WithField("rule", "TestChangedFuncs_Apply1")
  1103. for i, tt := range tests {
  1104. tempStore, _ := state.CreateStore("mockRule"+strconv.Itoa(i), api.AtMostOnce)
  1105. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta("mockRule"+strconv.Itoa(i), "project", tempStore)
  1106. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1107. if err != nil || stmt == nil {
  1108. t.Errorf("parse sql %s error %v", tt.sql, err)
  1109. }
  1110. pp := &ProjectOp{}
  1111. parseStmt(pp, stmt.Fields)
  1112. fv, afv := xsql.NewFunctionValuersForOp(ctx)
  1113. r := make([][]map[string]interface{}, 0, len(tt.data))
  1114. for _, d := range tt.data {
  1115. opResult := pp.Apply(ctx, d, fv, afv)
  1116. result, err := parseResult(opResult, pp.IsAggregate)
  1117. if err != nil {
  1118. t.Errorf("parse result error: %s", err)
  1119. continue
  1120. }
  1121. r = append(r, result)
  1122. }
  1123. if !reflect.DeepEqual(tt.result, r) {
  1124. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, r)
  1125. }
  1126. }
  1127. }