preprocessor_test.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "encoding/base64"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/internal/topo/context"
  22. "github.com/lf-edge/ekuiper/internal/xsql"
  23. "github.com/lf-edge/ekuiper/pkg/ast"
  24. "github.com/lf-edge/ekuiper/pkg/cast"
  25. "github.com/lf-edge/ekuiper/pkg/message"
  26. "io/ioutil"
  27. "log"
  28. "path"
  29. "reflect"
  30. "testing"
  31. "time"
  32. )
  33. func TestPreprocessor_Apply(t *testing.T) {
  34. var tests = []struct {
  35. stmt *ast.StreamStmt
  36. data []byte
  37. result interface{}
  38. }{
  39. //Basic type
  40. {
  41. stmt: &ast.StreamStmt{
  42. Name: ast.StreamName("demo"),
  43. StreamFields: []ast.StreamField{
  44. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  45. },
  46. },
  47. data: []byte(`{"a": 6}`),
  48. result: errors.New("error in preprocessor: invalid data map[a:%!s(float64=6)], field abc not found"),
  49. },
  50. {
  51. stmt: &ast.StreamStmt{
  52. Name: ast.StreamName("demo"),
  53. StreamFields: []ast.StreamField{
  54. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  55. },
  56. },
  57. data: []byte(`{"abc": null}`),
  58. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found <nil>(<nil>)"),
  59. },
  60. {
  61. stmt: &ast.StreamStmt{
  62. Name: ast.StreamName("demo"),
  63. StreamFields: nil,
  64. },
  65. data: []byte(`{"a": 6}`),
  66. result: &xsql.Tuple{Message: xsql.Message{
  67. "a": float64(6),
  68. },
  69. },
  70. },
  71. {
  72. stmt: &ast.StreamStmt{
  73. Name: ast.StreamName("demo"),
  74. StreamFields: []ast.StreamField{
  75. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  76. },
  77. },
  78. data: []byte(`{"abc": 6}`),
  79. result: &xsql.Tuple{Message: xsql.Message{
  80. "abc": 6,
  81. },
  82. },
  83. },
  84. {
  85. stmt: &ast.StreamStmt{
  86. Name: ast.StreamName("demo"),
  87. StreamFields: nil,
  88. },
  89. data: []byte(`{"abc": 6}`),
  90. result: &xsql.Tuple{Message: xsql.Message{
  91. "abc": float64(6),
  92. },
  93. },
  94. },
  95. {
  96. stmt: &ast.StreamStmt{
  97. Name: ast.StreamName("demo"),
  98. StreamFields: []ast.StreamField{
  99. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  100. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  101. },
  102. },
  103. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  104. result: &xsql.Tuple{Message: xsql.Message{
  105. "abc": float64(34),
  106. "def": "hello",
  107. },
  108. },
  109. },
  110. {
  111. stmt: &ast.StreamStmt{
  112. Name: ast.StreamName("demo"),
  113. StreamFields: nil,
  114. },
  115. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  116. result: &xsql.Tuple{Message: xsql.Message{
  117. "abc": float64(34),
  118. "def": "hello",
  119. "ghi": float64(50),
  120. },
  121. },
  122. },
  123. {
  124. stmt: &ast.StreamStmt{
  125. Name: ast.StreamName("demo"),
  126. StreamFields: []ast.StreamField{
  127. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  128. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  129. },
  130. },
  131. data: []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
  132. result: &xsql.Tuple{Message: xsql.Message{
  133. "abc": float64(34),
  134. "def": "hello",
  135. },
  136. },
  137. },
  138. {
  139. stmt: &ast.StreamStmt{
  140. Name: ast.StreamName("demo"),
  141. StreamFields: []ast.StreamField{
  142. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  143. {Name: "def", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  144. },
  145. },
  146. data: []byte(`{"abc": 77, "def" : "hello"}`),
  147. result: errors.New("error in preprocessor: invalid data type for def, expect boolean but found string(hello)"),
  148. },
  149. {
  150. stmt: &ast.StreamStmt{
  151. Name: ast.StreamName("demo"),
  152. StreamFields: []ast.StreamField{
  153. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  154. {Name: "def", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  155. },
  156. },
  157. data: []byte(`{"a": {"b" : "hello"}}`),
  158. result: errors.New("error in preprocessor: invalid data map[a:map[b:hello]], field abc not found"),
  159. },
  160. {
  161. stmt: &ast.StreamStmt{
  162. Name: ast.StreamName("demo"),
  163. StreamFields: nil,
  164. },
  165. data: []byte(`{"a": {"b" : "hello"}}`),
  166. result: &xsql.Tuple{Message: xsql.Message{
  167. "a": map[string]interface{}{
  168. "b": "hello",
  169. },
  170. },
  171. },
  172. },
  173. //Rec type
  174. {
  175. stmt: &ast.StreamStmt{
  176. Name: ast.StreamName("demo"),
  177. StreamFields: []ast.StreamField{
  178. {Name: "a", FieldType: &ast.RecType{
  179. StreamFields: []ast.StreamField{
  180. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  181. },
  182. }},
  183. },
  184. },
  185. data: []byte(`{"a": {"b" : "hello"}}`),
  186. result: &xsql.Tuple{Message: xsql.Message{
  187. "a": map[string]interface{}{
  188. "b": "hello",
  189. },
  190. },
  191. },
  192. },
  193. {
  194. stmt: &ast.StreamStmt{
  195. Name: ast.StreamName("demo"),
  196. StreamFields: []ast.StreamField{
  197. {Name: "a", FieldType: &ast.RecType{
  198. StreamFields: []ast.StreamField{
  199. {Name: "b", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  200. },
  201. }},
  202. },
  203. },
  204. data: []byte(`{"a": "{\"b\" : \"32\"}"}`),
  205. result: &xsql.Tuple{Message: xsql.Message{
  206. "a": map[string]interface{}{
  207. "b": float64(32),
  208. },
  209. },
  210. },
  211. },
  212. {
  213. stmt: &ast.StreamStmt{
  214. Name: ast.StreamName("demo"),
  215. StreamFields: nil,
  216. },
  217. data: []byte(`{"a": {"b" : "32"}}`),
  218. result: &xsql.Tuple{Message: xsql.Message{
  219. "a": map[string]interface{}{
  220. "b": "32",
  221. },
  222. },
  223. },
  224. },
  225. //Array of complex type
  226. {
  227. stmt: &ast.StreamStmt{
  228. Name: ast.StreamName("demo"),
  229. StreamFields: []ast.StreamField{
  230. {Name: "a", FieldType: &ast.ArrayType{
  231. Type: ast.STRUCT,
  232. FieldType: &ast.RecType{
  233. StreamFields: []ast.StreamField{
  234. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  235. },
  236. },
  237. }},
  238. },
  239. },
  240. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  241. result: &xsql.Tuple{Message: xsql.Message{
  242. "a": []map[string]interface{}{
  243. {"b": "hello1"},
  244. {"b": "hello2"},
  245. },
  246. },
  247. },
  248. },
  249. {
  250. stmt: &ast.StreamStmt{
  251. Name: ast.StreamName("demo"),
  252. StreamFields: []ast.StreamField{
  253. {Name: "a", FieldType: &ast.ArrayType{
  254. Type: ast.STRUCT,
  255. FieldType: &ast.RecType{
  256. StreamFields: []ast.StreamField{
  257. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  258. },
  259. },
  260. }},
  261. },
  262. },
  263. data: []byte(`{"a": []}`),
  264. result: &xsql.Tuple{Message: xsql.Message{
  265. "a": make([]map[string]interface{}, 0),
  266. },
  267. },
  268. },
  269. {
  270. stmt: &ast.StreamStmt{
  271. Name: ast.StreamName("demo"),
  272. StreamFields: []ast.StreamField{
  273. {Name: "a", FieldType: &ast.ArrayType{
  274. Type: ast.STRUCT,
  275. FieldType: &ast.RecType{
  276. StreamFields: []ast.StreamField{
  277. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  278. },
  279. },
  280. }},
  281. },
  282. },
  283. data: []byte(`{"a": null}`),
  284. result: &xsql.Tuple{Message: xsql.Message{
  285. "a": []map[string]interface{}(nil),
  286. },
  287. },
  288. },
  289. {
  290. stmt: &ast.StreamStmt{
  291. Name: ast.StreamName("demo"),
  292. StreamFields: []ast.StreamField{
  293. {Name: "a", FieldType: &ast.ArrayType{
  294. Type: ast.STRUCT,
  295. FieldType: &ast.RecType{
  296. StreamFields: []ast.StreamField{
  297. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  298. },
  299. },
  300. }},
  301. },
  302. },
  303. data: []byte(`{"a": [null, {"b" : "hello2"}]}`),
  304. result: &xsql.Tuple{Message: xsql.Message{
  305. "a": []map[string]interface{}{
  306. nil,
  307. {"b": "hello2"},
  308. },
  309. },
  310. },
  311. },
  312. {
  313. stmt: &ast.StreamStmt{
  314. Name: ast.StreamName("demo"),
  315. StreamFields: []ast.StreamField{
  316. {Name: "a", FieldType: &ast.ArrayType{
  317. Type: ast.ARRAY,
  318. FieldType: &ast.ArrayType{
  319. Type: ast.BIGINT,
  320. },
  321. }},
  322. },
  323. },
  324. data: []byte(`{"a": [[50, 60, 70],[66], [77]]}`),
  325. result: &xsql.Tuple{Message: xsql.Message{
  326. "a": [][]int{
  327. {50, 60, 70},
  328. {66},
  329. {77},
  330. },
  331. },
  332. },
  333. },
  334. {
  335. stmt: &ast.StreamStmt{
  336. Name: ast.StreamName("demo"),
  337. StreamFields: []ast.StreamField{
  338. {Name: "a", FieldType: &ast.ArrayType{
  339. Type: ast.ARRAY,
  340. FieldType: &ast.ArrayType{
  341. Type: ast.BIGINT,
  342. },
  343. }},
  344. },
  345. },
  346. data: []byte(`{"a": [null, [66], [77]]}`),
  347. result: &xsql.Tuple{Message: xsql.Message{
  348. "a": [][]int{
  349. []int(nil),
  350. {66},
  351. {77},
  352. },
  353. },
  354. },
  355. },
  356. {
  357. stmt: &ast.StreamStmt{
  358. Name: ast.StreamName("demo"),
  359. StreamFields: nil,
  360. },
  361. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  362. result: &xsql.Tuple{Message: xsql.Message{
  363. "a": []interface{}{
  364. map[string]interface{}{"b": "hello1"},
  365. map[string]interface{}{"b": "hello2"},
  366. },
  367. },
  368. },
  369. },
  370. {
  371. stmt: &ast.StreamStmt{
  372. Name: ast.StreamName("demo"),
  373. StreamFields: []ast.StreamField{
  374. {Name: "a", FieldType: &ast.ArrayType{
  375. Type: ast.FLOAT,
  376. }},
  377. },
  378. },
  379. data: []byte(`{"a": "[\"55\", \"77\"]"}`),
  380. result: &xsql.Tuple{Message: xsql.Message{
  381. "a": []float64{
  382. 55,
  383. 77,
  384. },
  385. },
  386. },
  387. },
  388. {
  389. stmt: &ast.StreamStmt{
  390. Name: ast.StreamName("demo"),
  391. StreamFields: nil,
  392. },
  393. data: []byte(`{"a": [55, 77]}`),
  394. result: &xsql.Tuple{Message: xsql.Message{
  395. "a": []interface{}{
  396. float64(55),
  397. float64(77),
  398. },
  399. },
  400. },
  401. },
  402. //Rec of complex type
  403. {
  404. stmt: &ast.StreamStmt{
  405. Name: ast.StreamName("demo"),
  406. StreamFields: []ast.StreamField{
  407. {Name: "a", FieldType: &ast.RecType{
  408. StreamFields: []ast.StreamField{
  409. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  410. {Name: "c", FieldType: &ast.RecType{
  411. StreamFields: []ast.StreamField{
  412. {Name: "d", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  413. },
  414. }},
  415. },
  416. }},
  417. },
  418. },
  419. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  420. result: &xsql.Tuple{Message: xsql.Message{
  421. "a": map[string]interface{}{
  422. "b": "hello",
  423. "c": map[string]interface{}{
  424. "d": 35,
  425. },
  426. },
  427. },
  428. },
  429. },
  430. {
  431. stmt: &ast.StreamStmt{
  432. Name: ast.StreamName("demo"),
  433. StreamFields: []ast.StreamField{
  434. {Name: "a", FieldType: &ast.RecType{
  435. StreamFields: []ast.StreamField{
  436. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  437. {Name: "c", FieldType: &ast.RecType{
  438. StreamFields: []ast.StreamField{
  439. {Name: "d", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  440. },
  441. }},
  442. },
  443. }},
  444. },
  445. },
  446. data: []byte(`{"a": null}`),
  447. result: &xsql.Tuple{Message: xsql.Message{
  448. "a": map[string]interface{}(nil),
  449. },
  450. },
  451. },
  452. {
  453. stmt: &ast.StreamStmt{
  454. Name: ast.StreamName("demo"),
  455. StreamFields: []ast.StreamField{
  456. {Name: "a", FieldType: &ast.RecType{
  457. StreamFields: []ast.StreamField{
  458. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  459. {Name: "c", FieldType: &ast.ArrayType{
  460. Type: ast.FLOAT,
  461. }},
  462. },
  463. }},
  464. },
  465. },
  466. data: []byte(`{"a": {"b" : "hello", "c": [35.2, 38.2]}}`),
  467. result: &xsql.Tuple{Message: xsql.Message{
  468. "a": map[string]interface{}{
  469. "b": "hello",
  470. "c": []float64{
  471. 35.2, 38.2,
  472. },
  473. },
  474. },
  475. },
  476. },
  477. {
  478. stmt: &ast.StreamStmt{
  479. Name: ast.StreamName("demo"),
  480. StreamFields: []ast.StreamField{
  481. {Name: "a", FieldType: &ast.RecType{
  482. StreamFields: []ast.StreamField{
  483. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  484. {Name: "c", FieldType: &ast.ArrayType{
  485. Type: ast.FLOAT,
  486. }},
  487. },
  488. }},
  489. },
  490. },
  491. data: []byte(`{"a": {"b" : "hello", "c": null}}`),
  492. result: &xsql.Tuple{Message: xsql.Message{
  493. "a": map[string]interface{}{
  494. "b": "hello",
  495. "c": []float64(nil),
  496. },
  497. },
  498. },
  499. },
  500. {
  501. stmt: &ast.StreamStmt{
  502. Name: ast.StreamName("demo"),
  503. StreamFields: []ast.StreamField{
  504. {Name: "a", FieldType: &ast.RecType{
  505. StreamFields: []ast.StreamField{
  506. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  507. {Name: "c", FieldType: &ast.ArrayType{
  508. Type: ast.FLOAT,
  509. }},
  510. },
  511. }},
  512. },
  513. },
  514. data: []byte(`{"a": {"b" : "hello", "c": [null, 35.4]}}`),
  515. result: errors.New("error in preprocessor: fail to parse field c: invalid data type for [0], expect float but found <nil>(<nil>)"),
  516. },
  517. {
  518. stmt: &ast.StreamStmt{
  519. Name: ast.StreamName("demo"),
  520. StreamFields: nil,
  521. },
  522. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  523. result: &xsql.Tuple{Message: xsql.Message{
  524. "a": map[string]interface{}{
  525. "b": "hello",
  526. "c": map[string]interface{}{
  527. "d": 35.2,
  528. },
  529. },
  530. },
  531. },
  532. },
  533. }
  534. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  535. defer conf.CloseLogger()
  536. contextLogger := conf.Log.WithField("rule", "TestPreprocessor_Apply")
  537. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  538. for i, tt := range tests {
  539. pp := &Preprocessor{}
  540. pp.streamFields = convertFields(tt.stmt.StreamFields)
  541. dm := make(map[string]interface{})
  542. if e := json.Unmarshal(tt.data, &dm); e != nil {
  543. log.Fatal(e)
  544. return
  545. } else {
  546. tuple := &xsql.Tuple{Message: dm}
  547. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  548. result := pp.Apply(ctx, tuple, fv, afv)
  549. if !reflect.DeepEqual(tt.result, result) {
  550. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  551. }
  552. }
  553. }
  554. }
  555. func TestPreprocessorTime_Apply(t *testing.T) {
  556. var tests = []struct {
  557. stmt *ast.StreamStmt
  558. data []byte
  559. result interface{}
  560. }{
  561. {
  562. stmt: &ast.StreamStmt{
  563. Name: ast.StreamName("demo"),
  564. StreamFields: []ast.StreamField{
  565. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  566. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  567. },
  568. },
  569. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  570. result: &xsql.Tuple{Message: xsql.Message{
  571. "abc": cast.TimeFromUnixMilli(1568854515000),
  572. "def": cast.TimeFromUnixMilli(1568854573431),
  573. },
  574. },
  575. },
  576. {
  577. stmt: &ast.StreamStmt{
  578. Name: ast.StreamName("demo"),
  579. StreamFields: nil,
  580. },
  581. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  582. result: &xsql.Tuple{Message: xsql.Message{
  583. "abc": "2019-09-19T00:55:15.000Z",
  584. "def": float64(1568854573431),
  585. },
  586. },
  587. },
  588. {
  589. stmt: &ast.StreamStmt{
  590. Name: ast.StreamName("demo"),
  591. StreamFields: []ast.StreamField{
  592. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  593. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  594. },
  595. },
  596. data: []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
  597. result: errors.New("error in preprocessor: invalid data type for abc, cannot convert to datetime: parsing time \"2019-09-19T00:55:1dd5Z\" as \"2006-01-02T15:04:05.000Z07:00\": cannot parse \"1dd5Z\" as \"05\""),
  598. },
  599. {
  600. stmt: &ast.StreamStmt{
  601. Name: ast.StreamName("demo"),
  602. StreamFields: []ast.StreamField{
  603. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  604. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  605. },
  606. Options: &ast.Options{
  607. DATASOURCE: "users",
  608. FORMAT: "JSON",
  609. KEY: "USERID",
  610. CONF_KEY: "srv1",
  611. TYPE: "MQTT",
  612. TIMESTAMP: "USERID",
  613. TIMESTAMP_FORMAT: "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
  614. },
  615. },
  616. data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
  617. result: &xsql.Tuple{Message: xsql.Message{
  618. "abc": cast.TimeFromUnixMilli(1568894115000),
  619. "def": cast.TimeFromUnixMilli(1568854573431),
  620. }},
  621. },
  622. //Array type
  623. {
  624. stmt: &ast.StreamStmt{
  625. Name: ast.StreamName("demo"),
  626. StreamFields: []ast.StreamField{
  627. {Name: "a", FieldType: &ast.ArrayType{
  628. Type: ast.DATETIME,
  629. }},
  630. },
  631. },
  632. data: []byte(`{"a": [1568854515123, 1568854573431]}`),
  633. result: &xsql.Tuple{Message: xsql.Message{
  634. "a": []time.Time{
  635. cast.TimeFromUnixMilli(1568854515123),
  636. cast.TimeFromUnixMilli(1568854573431),
  637. },
  638. },
  639. },
  640. },
  641. {
  642. stmt: &ast.StreamStmt{
  643. Name: ast.StreamName("demo"),
  644. StreamFields: []ast.StreamField{
  645. {Name: "a", FieldType: &ast.RecType{
  646. StreamFields: []ast.StreamField{
  647. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  648. {Name: "c", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  649. },
  650. }},
  651. },
  652. },
  653. data: []byte(`{"a": {"b" : "hello", "c": 1568854515000}}`),
  654. result: &xsql.Tuple{Message: xsql.Message{
  655. "a": map[string]interface{}{
  656. "b": "hello",
  657. "c": cast.TimeFromUnixMilli(1568854515000),
  658. },
  659. },
  660. },
  661. },
  662. }
  663. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  664. defer conf.CloseLogger()
  665. contextLogger := conf.Log.WithField("rule", "TestPreprocessorTime_Apply")
  666. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  667. for i, tt := range tests {
  668. pp := &Preprocessor{}
  669. pp.streamFields = convertFields(tt.stmt.StreamFields)
  670. if tt.stmt.Options != nil {
  671. pp.timestampFormat = tt.stmt.Options.TIMESTAMP_FORMAT
  672. }
  673. dm := make(map[string]interface{})
  674. if e := json.Unmarshal(tt.data, &dm); e != nil {
  675. log.Fatal(e)
  676. return
  677. } else {
  678. tuple := &xsql.Tuple{Message: dm}
  679. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  680. result := pp.Apply(ctx, tuple, fv, afv)
  681. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  682. if rt, ok := result.(*xsql.Tuple); ok {
  683. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  684. rt.Message["abc"] = rtt.UTC()
  685. }
  686. }
  687. if !reflect.DeepEqual(tt.result, result) {
  688. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  689. }
  690. }
  691. }
  692. }
  693. func convertFields(o ast.StreamFields) []interface{} {
  694. if o == nil {
  695. return nil
  696. }
  697. fields := make([]interface{}, len(o))
  698. for i := range o {
  699. fields[i] = &o[i]
  700. }
  701. return fields
  702. }
  703. func TestPreprocessorEventtime_Apply(t *testing.T) {
  704. var tests = []struct {
  705. stmt *ast.StreamStmt
  706. data []byte
  707. result interface{}
  708. }{
  709. //Basic type
  710. {
  711. stmt: &ast.StreamStmt{
  712. Name: ast.StreamName("demo"),
  713. StreamFields: []ast.StreamField{
  714. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  715. },
  716. Options: &ast.Options{
  717. DATASOURCE: "users",
  718. FORMAT: "JSON",
  719. KEY: "USERID",
  720. CONF_KEY: "srv1",
  721. TYPE: "MQTT",
  722. TIMESTAMP: "abc",
  723. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  724. },
  725. },
  726. data: []byte(`{"abc": 1568854515000}`),
  727. result: &xsql.Tuple{Message: xsql.Message{
  728. "abc": 1568854515000,
  729. }, Timestamp: 1568854515000,
  730. },
  731. },
  732. {
  733. stmt: &ast.StreamStmt{
  734. Name: ast.StreamName("demo"),
  735. StreamFields: nil,
  736. Options: &ast.Options{
  737. DATASOURCE: "users",
  738. FORMAT: "JSON",
  739. KEY: "USERID",
  740. CONF_KEY: "srv1",
  741. TYPE: "MQTT",
  742. TIMESTAMP: "abc",
  743. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  744. },
  745. },
  746. data: []byte(`{"abc": 1568854515000}`),
  747. result: &xsql.Tuple{Message: xsql.Message{
  748. "abc": float64(1568854515000),
  749. }, Timestamp: 1568854515000,
  750. },
  751. },
  752. {
  753. stmt: &ast.StreamStmt{
  754. Name: ast.StreamName("demo"),
  755. StreamFields: []ast.StreamField{
  756. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  757. },
  758. Options: &ast.Options{
  759. DATASOURCE: "users",
  760. TIMESTAMP: "abc",
  761. },
  762. },
  763. data: []byte(`{"abc": true}`),
  764. result: errors.New("cannot convert timestamp field abc to timestamp with error unsupported type to convert to timestamp true"),
  765. },
  766. {
  767. stmt: &ast.StreamStmt{
  768. Name: ast.StreamName("demo"),
  769. StreamFields: []ast.StreamField{
  770. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  771. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  772. },
  773. Options: &ast.Options{
  774. DATASOURCE: "users",
  775. TIMESTAMP: "def",
  776. },
  777. },
  778. data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
  779. result: &xsql.Tuple{Message: xsql.Message{
  780. "abc": float64(34),
  781. "def": "2019-09-23T02:47:29.754Z",
  782. }, Timestamp: int64(1569206849754),
  783. },
  784. },
  785. {
  786. stmt: &ast.StreamStmt{
  787. Name: ast.StreamName("demo"),
  788. StreamFields: []ast.StreamField{
  789. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  790. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  791. },
  792. Options: &ast.Options{
  793. DATASOURCE: "users",
  794. TIMESTAMP: "abc",
  795. },
  796. },
  797. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  798. result: &xsql.Tuple{Message: xsql.Message{
  799. "abc": cast.TimeFromUnixMilli(1568854515000),
  800. "def": cast.TimeFromUnixMilli(1568854573431),
  801. }, Timestamp: int64(1568854515000),
  802. },
  803. },
  804. {
  805. stmt: &ast.StreamStmt{
  806. Name: ast.StreamName("demo"),
  807. StreamFields: []ast.StreamField{
  808. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  809. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  810. },
  811. Options: &ast.Options{
  812. DATASOURCE: "users",
  813. TIMESTAMP: "def",
  814. TIMESTAMP_FORMAT: "yyyy-MM-dd'AT'HH:mm:ss",
  815. },
  816. },
  817. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  818. result: &xsql.Tuple{Message: xsql.Message{
  819. "abc": float64(34),
  820. "def": "2019-09-23AT02:47:29",
  821. }, Timestamp: int64(1569206849000),
  822. },
  823. },
  824. {
  825. stmt: &ast.StreamStmt{
  826. Name: ast.StreamName("demo"),
  827. StreamFields: []ast.StreamField{
  828. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  829. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  830. },
  831. Options: &ast.Options{
  832. DATASOURCE: "users",
  833. TIMESTAMP: "def",
  834. TIMESTAMP_FORMAT: "yyyy-MM-ddaHH:mm:ss",
  835. },
  836. },
  837. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  838. result: errors.New("cannot convert timestamp field def to timestamp with error parsing time \"2019-09-23AT02:47:29\" as \"2006-01-02PM15:04:05\": cannot parse \"02:47:29\" as \"PM\""),
  839. },
  840. }
  841. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  842. defer conf.CloseLogger()
  843. contextLogger := conf.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
  844. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  845. for i, tt := range tests {
  846. pp := &Preprocessor{
  847. defaultFieldProcessor: defaultFieldProcessor{
  848. streamFields: convertFields(tt.stmt.StreamFields),
  849. isBinary: false,
  850. timestampFormat: tt.stmt.Options.TIMESTAMP_FORMAT,
  851. },
  852. isEventTime: true,
  853. timestampField: tt.stmt.Options.TIMESTAMP,
  854. }
  855. dm := make(map[string]interface{})
  856. if e := json.Unmarshal(tt.data, &dm); e != nil {
  857. log.Fatal(e)
  858. return
  859. } else {
  860. tuple := &xsql.Tuple{Message: dm}
  861. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  862. result := pp.Apply(ctx, tuple, fv, afv)
  863. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  864. if rt, ok := result.(*xsql.Tuple); ok {
  865. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  866. rt.Message["abc"] = rtt.UTC()
  867. }
  868. }
  869. if !reflect.DeepEqual(tt.result, result) {
  870. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  871. }
  872. }
  873. }
  874. }
  875. func TestPreprocessorError(t *testing.T) {
  876. tests := []struct {
  877. stmt *ast.StreamStmt
  878. data []byte
  879. result interface{}
  880. }{
  881. {
  882. stmt: &ast.StreamStmt{
  883. Name: ast.StreamName("demo"),
  884. StreamFields: []ast.StreamField{
  885. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  886. },
  887. },
  888. data: []byte(`{"abc": "dafsad"}`),
  889. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(dafsad)"),
  890. }, {
  891. stmt: &ast.StreamStmt{
  892. Name: ast.StreamName("demo"),
  893. StreamFields: []ast.StreamField{
  894. {Name: "a", FieldType: &ast.RecType{
  895. StreamFields: []ast.StreamField{
  896. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  897. },
  898. }},
  899. },
  900. },
  901. data: []byte(`{"a": {"d" : "hello"}}`),
  902. result: errors.New("error in preprocessor: invalid data map[d:hello], field b not found"),
  903. }, {
  904. stmt: &ast.StreamStmt{
  905. Name: ast.StreamName("demo"),
  906. StreamFields: []ast.StreamField{
  907. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  908. },
  909. Options: &ast.Options{
  910. DATASOURCE: "users",
  911. FORMAT: "JSON",
  912. KEY: "USERID",
  913. CONF_KEY: "srv1",
  914. TYPE: "MQTT",
  915. TIMESTAMP: "abc",
  916. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  917. },
  918. },
  919. data: []byte(`{"abc": "not a time"}`),
  920. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(not a time)"),
  921. },
  922. }
  923. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  924. defer conf.CloseLogger()
  925. contextLogger := conf.Log.WithField("rule", "TestPreprocessorError")
  926. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  927. for i, tt := range tests {
  928. pp := &Preprocessor{}
  929. pp.streamFields = convertFields(tt.stmt.StreamFields)
  930. dm := make(map[string]interface{})
  931. if e := json.Unmarshal(tt.data, &dm); e != nil {
  932. log.Fatal(e)
  933. return
  934. } else {
  935. tuple := &xsql.Tuple{Message: dm}
  936. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  937. result := pp.Apply(ctx, tuple, fv, afv)
  938. if !reflect.DeepEqual(tt.result, result) {
  939. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  940. }
  941. }
  942. }
  943. }
  944. func TestPreprocessorForBinary(t *testing.T) {
  945. docsFolder, err := conf.GetLoc("docs/")
  946. if err != nil {
  947. t.Errorf("Cannot find docs folder: %v", err)
  948. }
  949. image, err := ioutil.ReadFile(path.Join(docsFolder, "cover.jpg"))
  950. if err != nil {
  951. t.Errorf("Cannot read image: %v", err)
  952. }
  953. b64img := base64.StdEncoding.EncodeToString(image)
  954. //TODO test bytea type conversion to string or else
  955. var tests = []struct {
  956. stmt *ast.StreamStmt
  957. data []byte
  958. isBinary bool
  959. result interface{}
  960. }{
  961. {
  962. stmt: &ast.StreamStmt{
  963. Name: ast.StreamName("demo"),
  964. StreamFields: nil,
  965. },
  966. data: image,
  967. isBinary: true,
  968. result: &xsql.Tuple{Message: xsql.Message{
  969. "self": image,
  970. },
  971. },
  972. },
  973. {
  974. stmt: &ast.StreamStmt{
  975. Name: ast.StreamName("demo"),
  976. StreamFields: []ast.StreamField{
  977. {Name: "img", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  978. },
  979. },
  980. data: image,
  981. isBinary: true,
  982. result: &xsql.Tuple{Message: xsql.Message{
  983. "img": image,
  984. },
  985. },
  986. },
  987. {
  988. stmt: &ast.StreamStmt{
  989. Name: ast.StreamName("demo"),
  990. StreamFields: []ast.StreamField{
  991. {Name: "a", FieldType: &ast.RecType{
  992. StreamFields: []ast.StreamField{
  993. {Name: "b", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  994. },
  995. }},
  996. },
  997. },
  998. data: []byte(fmt.Sprintf(`{"a": {"b" : "%s"}}`, b64img)),
  999. result: &xsql.Tuple{Message: xsql.Message{
  1000. "a": map[string]interface{}{
  1001. "b": image,
  1002. },
  1003. },
  1004. },
  1005. },
  1006. {
  1007. stmt: &ast.StreamStmt{
  1008. Name: ast.StreamName("demo"),
  1009. StreamFields: []ast.StreamField{
  1010. {Name: "a", FieldType: &ast.ArrayType{
  1011. Type: ast.BYTEA,
  1012. }},
  1013. },
  1014. },
  1015. data: []byte(fmt.Sprintf(`{"a": ["%s"]}`, b64img)),
  1016. result: &xsql.Tuple{Message: xsql.Message{
  1017. "a": [][]byte{
  1018. image,
  1019. },
  1020. },
  1021. },
  1022. },
  1023. {
  1024. stmt: &ast.StreamStmt{
  1025. Name: ast.StreamName("demo"),
  1026. StreamFields: []ast.StreamField{
  1027. {Name: "a", FieldType: &ast.ArrayType{
  1028. Type: ast.STRUCT,
  1029. FieldType: &ast.RecType{
  1030. StreamFields: []ast.StreamField{
  1031. {Name: "b", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  1032. },
  1033. },
  1034. }},
  1035. },
  1036. },
  1037. data: []byte(fmt.Sprintf(`{"a": [{"b":"%s"}]}`, b64img)),
  1038. result: &xsql.Tuple{Message: xsql.Message{
  1039. "a": []map[string]interface{}{
  1040. {"b": image},
  1041. },
  1042. },
  1043. },
  1044. },
  1045. }
  1046. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1047. defer conf.CloseLogger()
  1048. contextLogger := conf.Log.WithField("rule", "TestPreprocessorForBinary")
  1049. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1050. for i, tt := range tests {
  1051. pp := &Preprocessor{}
  1052. pp.streamFields = convertFields(tt.stmt.StreamFields)
  1053. pp.isBinary = tt.isBinary
  1054. format := "json"
  1055. if tt.isBinary {
  1056. format = "binary"
  1057. }
  1058. if dm, e := message.Decode(tt.data, format); e != nil {
  1059. log.Fatal(e)
  1060. return
  1061. } else {
  1062. tuple := &xsql.Tuple{Message: dm}
  1063. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  1064. result := pp.Apply(ctx, tuple, fv, afv)
  1065. if !reflect.DeepEqual(tt.result, result) {
  1066. t.Errorf("%d. %q\n\nresult mismatch", i, tuple)
  1067. }
  1068. }
  1069. }
  1070. }