preprocessor_test.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "encoding/base64"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "log"
  21. "os"
  22. "path"
  23. "reflect"
  24. "testing"
  25. "time"
  26. "github.com/lf-edge/ekuiper/internal/conf"
  27. "github.com/lf-edge/ekuiper/internal/converter"
  28. "github.com/lf-edge/ekuiper/internal/topo/context"
  29. "github.com/lf-edge/ekuiper/internal/xsql"
  30. "github.com/lf-edge/ekuiper/pkg/ast"
  31. "github.com/lf-edge/ekuiper/pkg/cast"
  32. "github.com/lf-edge/ekuiper/pkg/message"
  33. )
  34. func TestPreprocessor_Apply(t *testing.T) {
  35. tests := []struct {
  36. stmt *ast.StreamStmt
  37. data []byte
  38. result interface{}
  39. }{
  40. // Basic type
  41. { // 0
  42. stmt: &ast.StreamStmt{
  43. Name: ast.StreamName("demo"),
  44. StreamFields: []ast.StreamField{
  45. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  46. },
  47. },
  48. data: []byte(`{"a": 6}`),
  49. result: errors.New("error in preprocessor: field abc is not found"),
  50. },
  51. { // 1
  52. stmt: &ast.StreamStmt{
  53. Name: ast.StreamName("demo"),
  54. StreamFields: []ast.StreamField{
  55. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  56. },
  57. },
  58. data: []byte(`{"abc": null}`),
  59. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert <nil>(<nil>) to int64"),
  60. },
  61. { // 2
  62. stmt: &ast.StreamStmt{
  63. Name: ast.StreamName("demo"),
  64. StreamFields: nil,
  65. },
  66. data: []byte(`{"a": 6}`),
  67. result: &xsql.Tuple{
  68. Message: xsql.Message{
  69. "a": float64(6),
  70. },
  71. },
  72. },
  73. { // 3
  74. stmt: &ast.StreamStmt{
  75. Name: ast.StreamName("demo"),
  76. StreamFields: []ast.StreamField{
  77. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  78. },
  79. },
  80. data: []byte(`{"abc": 6}`),
  81. result: &xsql.Tuple{
  82. Message: xsql.Message{
  83. "abc": int64(6),
  84. },
  85. },
  86. },
  87. { // 4
  88. stmt: &ast.StreamStmt{
  89. Name: ast.StreamName("demo"),
  90. StreamFields: nil,
  91. },
  92. data: []byte(`{"abc": 6}`),
  93. result: &xsql.Tuple{
  94. Message: xsql.Message{
  95. "abc": float64(6),
  96. },
  97. },
  98. },
  99. { // 5
  100. stmt: &ast.StreamStmt{
  101. Name: ast.StreamName("demo"),
  102. StreamFields: []ast.StreamField{
  103. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  104. {Name: "dEf", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  105. },
  106. },
  107. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  108. result: &xsql.Tuple{
  109. Message: xsql.Message{
  110. "abc": float64(34),
  111. "dEf": "hello",
  112. "def": "hello",
  113. "ghi": float64(50),
  114. },
  115. },
  116. },
  117. { // 6
  118. stmt: &ast.StreamStmt{
  119. Name: ast.StreamName("demo"),
  120. StreamFields: nil,
  121. },
  122. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  123. result: &xsql.Tuple{
  124. Message: xsql.Message{
  125. "abc": float64(34),
  126. "def": "hello",
  127. "ghi": float64(50),
  128. },
  129. },
  130. },
  131. { // 7
  132. stmt: &ast.StreamStmt{
  133. Name: ast.StreamName("demo"),
  134. StreamFields: []ast.StreamField{
  135. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  136. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  137. },
  138. },
  139. data: []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
  140. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert string(34) to float64"),
  141. },
  142. { // 8
  143. stmt: &ast.StreamStmt{
  144. Name: ast.StreamName("demo"),
  145. StreamFields: []ast.StreamField{
  146. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  147. {Name: "def", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  148. },
  149. },
  150. data: []byte(`{"abc": 77, "def" : "hello"}`),
  151. result: errors.New("error in preprocessor: field def type mismatch: cannot convert string(hello) to bool"),
  152. },
  153. { // 9
  154. stmt: &ast.StreamStmt{
  155. Name: ast.StreamName("demo"),
  156. StreamFields: []ast.StreamField{
  157. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  158. },
  159. },
  160. data: []byte(`{"a": {"b" : "hello"}}`),
  161. result: errors.New("error in preprocessor: field abc is not found"),
  162. },
  163. { // 10
  164. stmt: &ast.StreamStmt{
  165. Name: ast.StreamName("demo"),
  166. StreamFields: nil,
  167. },
  168. data: []byte(`{"a": {"b" : "hello"}}`),
  169. result: &xsql.Tuple{
  170. Message: xsql.Message{
  171. "a": map[string]interface{}{
  172. "b": "hello",
  173. },
  174. },
  175. },
  176. },
  177. // Rec type
  178. { // 11
  179. stmt: &ast.StreamStmt{
  180. Name: ast.StreamName("demo"),
  181. StreamFields: []ast.StreamField{
  182. {Name: "a", FieldType: &ast.RecType{
  183. StreamFields: []ast.StreamField{
  184. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  185. },
  186. }},
  187. },
  188. },
  189. data: []byte(`{"a": {"b" : "hello"}}`),
  190. result: &xsql.Tuple{
  191. Message: xsql.Message{
  192. "a": map[string]interface{}{
  193. "b": "hello",
  194. },
  195. },
  196. },
  197. },
  198. { // 12
  199. stmt: &ast.StreamStmt{
  200. Name: ast.StreamName("demo"),
  201. StreamFields: []ast.StreamField{
  202. {Name: "a", FieldType: &ast.RecType{
  203. StreamFields: []ast.StreamField{
  204. {Name: "b", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  205. },
  206. }},
  207. },
  208. },
  209. data: []byte(`{"a": "{\"b\" : \"32\"}"}`),
  210. result: errors.New("error in preprocessor: field a type mismatch: field b type mismatch: cannot convert string(32) to float64"),
  211. },
  212. { // 13
  213. stmt: &ast.StreamStmt{
  214. Name: ast.StreamName("demo"),
  215. StreamFields: nil,
  216. },
  217. data: []byte(`{"a": {"b" : "32"}}`),
  218. result: &xsql.Tuple{
  219. Message: xsql.Message{
  220. "a": map[string]interface{}{
  221. "b": "32",
  222. },
  223. },
  224. },
  225. },
  226. // Array of complex type
  227. { // 14
  228. stmt: &ast.StreamStmt{
  229. Name: ast.StreamName("demo"),
  230. StreamFields: []ast.StreamField{
  231. {Name: "a", FieldType: &ast.ArrayType{
  232. Type: ast.STRUCT,
  233. FieldType: &ast.RecType{
  234. StreamFields: []ast.StreamField{
  235. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  236. },
  237. },
  238. }},
  239. },
  240. },
  241. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  242. result: &xsql.Tuple{
  243. Message: xsql.Message{
  244. "a": []interface{}{
  245. map[string]interface{}{"b": "hello1"},
  246. map[string]interface{}{"b": "hello2"},
  247. },
  248. },
  249. },
  250. },
  251. { // 15
  252. stmt: &ast.StreamStmt{
  253. Name: ast.StreamName("demo"),
  254. StreamFields: []ast.StreamField{
  255. {Name: "a", FieldType: &ast.ArrayType{
  256. Type: ast.STRUCT,
  257. FieldType: &ast.RecType{
  258. StreamFields: []ast.StreamField{
  259. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  260. },
  261. },
  262. }},
  263. },
  264. },
  265. data: []byte(`{"a": []}`),
  266. result: &xsql.Tuple{
  267. Message: xsql.Message{
  268. "a": make([]interface{}, 0),
  269. },
  270. },
  271. },
  272. { // 16
  273. stmt: &ast.StreamStmt{
  274. Name: ast.StreamName("demo"),
  275. StreamFields: []ast.StreamField{
  276. {Name: "a", FieldType: &ast.ArrayType{
  277. Type: ast.STRUCT,
  278. FieldType: &ast.RecType{
  279. StreamFields: []ast.StreamField{
  280. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  281. },
  282. },
  283. }},
  284. },
  285. },
  286. data: []byte(`{"a": null}`),
  287. result: &xsql.Tuple{
  288. Message: xsql.Message{
  289. "a": []interface{}(nil),
  290. },
  291. },
  292. },
  293. { // 17
  294. stmt: &ast.StreamStmt{
  295. Name: ast.StreamName("demo"),
  296. StreamFields: []ast.StreamField{
  297. {Name: "a", FieldType: &ast.ArrayType{
  298. Type: ast.STRUCT,
  299. FieldType: &ast.RecType{
  300. StreamFields: []ast.StreamField{
  301. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  302. },
  303. },
  304. }},
  305. },
  306. },
  307. data: []byte(`{"a": [null, {"b" : "hello2"}]}`),
  308. result: &xsql.Tuple{
  309. Message: xsql.Message{
  310. "a": []interface{}{
  311. map[string]interface{}(nil),
  312. map[string]interface{}{"b": "hello2"},
  313. },
  314. },
  315. },
  316. },
  317. { // 18
  318. stmt: &ast.StreamStmt{
  319. Name: ast.StreamName("demo"),
  320. StreamFields: []ast.StreamField{
  321. {Name: "a", FieldType: &ast.ArrayType{
  322. Type: ast.ARRAY,
  323. FieldType: &ast.ArrayType{
  324. Type: ast.BIGINT,
  325. },
  326. }},
  327. },
  328. },
  329. data: []byte(`{"a": [[50, 60, 70],[66], [77]]}`),
  330. result: &xsql.Tuple{
  331. Message: xsql.Message{
  332. "a": []interface{}{
  333. []interface{}{int64(50), int64(60), int64(70)},
  334. []interface{}{int64(66)},
  335. []interface{}{int64(77)},
  336. },
  337. },
  338. },
  339. },
  340. { // 19
  341. stmt: &ast.StreamStmt{
  342. Name: ast.StreamName("demo"),
  343. StreamFields: []ast.StreamField{
  344. {Name: "a", FieldType: &ast.ArrayType{
  345. Type: ast.ARRAY,
  346. FieldType: &ast.ArrayType{
  347. Type: ast.BIGINT,
  348. },
  349. }},
  350. },
  351. },
  352. data: []byte(`{"a": [null, [66], [77]]}`),
  353. result: &xsql.Tuple{
  354. Message: xsql.Message{
  355. "a": []interface{}{
  356. []interface{}(nil),
  357. []interface{}{int64(66)},
  358. []interface{}{int64(77)},
  359. },
  360. },
  361. },
  362. },
  363. { // 20
  364. stmt: &ast.StreamStmt{
  365. Name: ast.StreamName("demo"),
  366. StreamFields: nil,
  367. },
  368. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  369. result: &xsql.Tuple{
  370. Message: xsql.Message{
  371. "a": []interface{}{
  372. map[string]interface{}{"b": "hello1"},
  373. map[string]interface{}{"b": "hello2"},
  374. },
  375. },
  376. },
  377. },
  378. { // 21
  379. stmt: &ast.StreamStmt{
  380. Name: ast.StreamName("demo"),
  381. StreamFields: []ast.StreamField{
  382. {Name: "a", FieldType: &ast.ArrayType{
  383. Type: ast.FLOAT,
  384. }},
  385. },
  386. },
  387. data: []byte(`{"a": "[\"55\", \"77\"]"}`),
  388. result: errors.New("error in preprocessor: field a type mismatch: expect array but got [\"55\", \"77\"]"),
  389. },
  390. { // 22
  391. stmt: &ast.StreamStmt{
  392. Name: ast.StreamName("demo"),
  393. StreamFields: nil,
  394. },
  395. data: []byte(`{"a": [55, 77]}`),
  396. result: &xsql.Tuple{
  397. Message: xsql.Message{
  398. "a": []interface{}{
  399. float64(55),
  400. float64(77),
  401. },
  402. },
  403. },
  404. },
  405. // Rec of complex type
  406. { // 23
  407. stmt: &ast.StreamStmt{
  408. Name: ast.StreamName("demo"),
  409. StreamFields: []ast.StreamField{
  410. {Name: "a", FieldType: &ast.RecType{
  411. StreamFields: []ast.StreamField{
  412. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  413. {Name: "c", FieldType: &ast.RecType{
  414. StreamFields: []ast.StreamField{
  415. {Name: "d", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  416. },
  417. }},
  418. },
  419. }},
  420. },
  421. },
  422. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  423. result: &xsql.Tuple{
  424. Message: xsql.Message{
  425. "a": map[string]interface{}{
  426. "b": "hello",
  427. "c": map[string]interface{}{
  428. "d": int64(35),
  429. },
  430. },
  431. },
  432. },
  433. },
  434. { // 24
  435. stmt: &ast.StreamStmt{
  436. Name: ast.StreamName("demo"),
  437. StreamFields: []ast.StreamField{
  438. {Name: "a", FieldType: &ast.RecType{
  439. StreamFields: []ast.StreamField{
  440. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  441. {Name: "c", FieldType: &ast.RecType{
  442. StreamFields: []ast.StreamField{
  443. {Name: "d", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  444. },
  445. }},
  446. },
  447. }},
  448. },
  449. },
  450. data: []byte(`{"a": null}`),
  451. result: &xsql.Tuple{
  452. Message: xsql.Message{
  453. "a": map[string]interface{}(nil),
  454. },
  455. },
  456. },
  457. { // 25
  458. stmt: &ast.StreamStmt{
  459. Name: ast.StreamName("demo"),
  460. StreamFields: []ast.StreamField{
  461. {Name: "a", FieldType: &ast.RecType{
  462. StreamFields: []ast.StreamField{
  463. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  464. {Name: "c", FieldType: &ast.ArrayType{
  465. Type: ast.FLOAT,
  466. }},
  467. },
  468. }},
  469. },
  470. },
  471. data: []byte(`{"a": {"b" : "hello", "c": [35.2, 38.2]}}`),
  472. result: &xsql.Tuple{
  473. Message: xsql.Message{
  474. "a": map[string]interface{}{
  475. "b": "hello",
  476. "c": []interface{}{
  477. 35.2, 38.2,
  478. },
  479. },
  480. },
  481. },
  482. },
  483. { // 26
  484. stmt: &ast.StreamStmt{
  485. Name: ast.StreamName("demo"),
  486. StreamFields: []ast.StreamField{
  487. {Name: "a", FieldType: &ast.RecType{
  488. StreamFields: []ast.StreamField{
  489. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  490. {Name: "c", FieldType: &ast.ArrayType{
  491. Type: ast.FLOAT,
  492. }},
  493. },
  494. }},
  495. },
  496. },
  497. data: []byte(`{"a": {"b" : "hello", "c": null}}`),
  498. result: &xsql.Tuple{
  499. Message: xsql.Message{
  500. "a": map[string]interface{}{
  501. "b": "hello",
  502. "c": []interface{}(nil),
  503. },
  504. },
  505. },
  506. },
  507. { // 27
  508. stmt: &ast.StreamStmt{
  509. Name: ast.StreamName("demo"),
  510. StreamFields: []ast.StreamField{
  511. {Name: "a", FieldType: &ast.RecType{
  512. StreamFields: []ast.StreamField{
  513. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  514. {Name: "c", FieldType: &ast.ArrayType{
  515. Type: ast.FLOAT,
  516. }},
  517. },
  518. }},
  519. },
  520. },
  521. data: []byte(`{"a": {"b" : "hello", "c": [null, 35.4]}}`),
  522. result: errors.New("error in preprocessor: field a type mismatch: field c type mismatch: array element type mismatch: cannot convert <nil>(<nil>) to float64"),
  523. },
  524. { // 28
  525. stmt: &ast.StreamStmt{
  526. Name: ast.StreamName("demo"),
  527. StreamFields: nil,
  528. },
  529. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  530. result: &xsql.Tuple{
  531. Message: xsql.Message{
  532. "a": map[string]interface{}{
  533. "b": "hello",
  534. "c": map[string]interface{}{
  535. "d": 35.2,
  536. },
  537. },
  538. },
  539. },
  540. },
  541. }
  542. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  543. defer conf.CloseLogger()
  544. contextLogger := conf.Log.WithField("rule", "TestPreprocessor_Apply")
  545. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  546. for i, tt := range tests {
  547. pp := &Preprocessor{checkSchema: true}
  548. pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
  549. dm := make(map[string]interface{})
  550. if e := json.Unmarshal(tt.data, &dm); e != nil {
  551. log.Fatal(e)
  552. return
  553. } else {
  554. tuple := &xsql.Tuple{Message: dm}
  555. fv, afv := xsql.NewFunctionValuersForOp(nil)
  556. result := pp.Apply(ctx, tuple, fv, afv)
  557. if !reflect.DeepEqual(tt.result, result) {
  558. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  559. }
  560. }
  561. }
  562. }
  563. func TestPreprocessorTime_Apply(t *testing.T) {
  564. tests := []struct {
  565. stmt *ast.StreamStmt
  566. data []byte
  567. result interface{}
  568. }{
  569. { // 0
  570. stmt: &ast.StreamStmt{
  571. Name: ast.StreamName("demo"),
  572. StreamFields: []ast.StreamField{
  573. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  574. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  575. },
  576. },
  577. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  578. result: &xsql.Tuple{
  579. Message: xsql.Message{
  580. "abc": cast.TimeFromUnixMilli(1568854515000),
  581. "def": cast.TimeFromUnixMilli(1568854573431),
  582. },
  583. },
  584. },
  585. { // 1
  586. stmt: &ast.StreamStmt{
  587. Name: ast.StreamName("demo"),
  588. StreamFields: nil,
  589. },
  590. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  591. result: &xsql.Tuple{
  592. Message: xsql.Message{
  593. "abc": "2019-09-19T00:55:15.000Z",
  594. "def": float64(1568854573431),
  595. },
  596. },
  597. },
  598. { // 2
  599. stmt: &ast.StreamStmt{
  600. Name: ast.StreamName("demo"),
  601. StreamFields: []ast.StreamField{
  602. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  603. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  604. },
  605. },
  606. data: []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
  607. result: errors.New("error in preprocessor: field abc type mismatch: parsing time \"2019-09-19T00:55:1dd5Z\" as \"2006-01-02T15:04:05.000Z07:00\": cannot parse \"1dd5Z\" as \"05\""),
  608. },
  609. { // 3
  610. stmt: &ast.StreamStmt{
  611. Name: ast.StreamName("demo"),
  612. StreamFields: []ast.StreamField{
  613. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  614. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  615. },
  616. Options: &ast.Options{
  617. DATASOURCE: "users",
  618. FORMAT: "JSON",
  619. KEY: "USERID",
  620. CONF_KEY: "srv1",
  621. TYPE: "MQTT",
  622. TIMESTAMP: "USERID",
  623. TIMESTAMP_FORMAT: "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
  624. },
  625. },
  626. data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
  627. result: &xsql.Tuple{Message: xsql.Message{
  628. "abc": cast.TimeFromUnixMilli(1568894115000),
  629. "def": cast.TimeFromUnixMilli(1568854573431),
  630. }},
  631. },
  632. // Array type
  633. { // 4
  634. stmt: &ast.StreamStmt{
  635. Name: ast.StreamName("demo"),
  636. StreamFields: []ast.StreamField{
  637. {Name: "a", FieldType: &ast.ArrayType{
  638. Type: ast.DATETIME,
  639. }},
  640. },
  641. },
  642. data: []byte(`{"a": [1568854515123, 1568854573431]}`),
  643. result: &xsql.Tuple{
  644. Message: xsql.Message{
  645. "a": []interface{}{
  646. cast.TimeFromUnixMilli(1568854515123),
  647. cast.TimeFromUnixMilli(1568854573431),
  648. },
  649. },
  650. },
  651. },
  652. { // 5
  653. stmt: &ast.StreamStmt{
  654. Name: ast.StreamName("demo"),
  655. StreamFields: []ast.StreamField{
  656. {Name: "a", FieldType: &ast.RecType{
  657. StreamFields: []ast.StreamField{
  658. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  659. {Name: "c", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  660. },
  661. }},
  662. },
  663. },
  664. data: []byte(`{"a": {"b" : "hello", "c": 1568854515000}}`),
  665. result: &xsql.Tuple{
  666. Message: xsql.Message{
  667. "a": map[string]interface{}{
  668. "b": "hello",
  669. "c": cast.TimeFromUnixMilli(1568854515000),
  670. },
  671. },
  672. },
  673. },
  674. }
  675. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  676. defer conf.CloseLogger()
  677. contextLogger := conf.Log.WithField("rule", "TestPreprocessorTime_Apply")
  678. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  679. for i, tt := range tests {
  680. pp := &Preprocessor{checkSchema: true}
  681. pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
  682. if tt.stmt.Options != nil {
  683. pp.timestampFormat = tt.stmt.Options.TIMESTAMP_FORMAT
  684. }
  685. dm := make(map[string]interface{})
  686. if e := json.Unmarshal(tt.data, &dm); e != nil {
  687. log.Fatal(e)
  688. return
  689. } else {
  690. tuple := &xsql.Tuple{Message: dm}
  691. fv, afv := xsql.NewFunctionValuersForOp(nil)
  692. result := pp.Apply(ctx, tuple, fv, afv)
  693. // workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  694. if rt, ok := result.(*xsql.Tuple); ok {
  695. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  696. rt.Message["abc"] = rtt.UTC()
  697. }
  698. }
  699. if !reflect.DeepEqual(tt.result, result) {
  700. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  701. }
  702. }
  703. }
  704. }
  705. func convertFields(o ast.StreamFields) []interface{} {
  706. if o == nil {
  707. return nil
  708. }
  709. fields := make([]interface{}, len(o))
  710. for i := range o {
  711. fields[i] = &o[i]
  712. }
  713. return fields
  714. }
  715. func TestPreprocessorEventtime_Apply(t *testing.T) {
  716. tests := []struct {
  717. stmt *ast.StreamStmt
  718. data []byte
  719. result interface{}
  720. }{
  721. // Basic type
  722. { // 0
  723. stmt: &ast.StreamStmt{
  724. Name: ast.StreamName("demo"),
  725. StreamFields: []ast.StreamField{
  726. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  727. },
  728. Options: &ast.Options{
  729. DATASOURCE: "users",
  730. FORMAT: "JSON",
  731. KEY: "USERID",
  732. CONF_KEY: "srv1",
  733. TYPE: "MQTT",
  734. TIMESTAMP: "abc",
  735. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  736. },
  737. },
  738. data: []byte(`{"abc": 1568854515000}`),
  739. result: &xsql.Tuple{
  740. Message: xsql.Message{
  741. "abc": int64(1568854515000),
  742. }, Timestamp: 1568854515000,
  743. },
  744. },
  745. { // 1
  746. stmt: &ast.StreamStmt{
  747. Name: ast.StreamName("demo"),
  748. StreamFields: nil,
  749. Options: &ast.Options{
  750. DATASOURCE: "users",
  751. FORMAT: "JSON",
  752. KEY: "USERID",
  753. CONF_KEY: "srv1",
  754. TYPE: "MQTT",
  755. TIMESTAMP: "abc",
  756. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  757. },
  758. },
  759. data: []byte(`{"abc": 1568854515000}`),
  760. result: &xsql.Tuple{
  761. Message: xsql.Message{
  762. "abc": float64(1568854515000),
  763. }, Timestamp: 1568854515000,
  764. },
  765. },
  766. { // 2
  767. stmt: &ast.StreamStmt{
  768. Name: ast.StreamName("demo"),
  769. StreamFields: []ast.StreamField{
  770. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  771. },
  772. Options: &ast.Options{
  773. DATASOURCE: "users",
  774. TIMESTAMP: "abc",
  775. },
  776. },
  777. data: []byte(`{"abc": true}`),
  778. result: errors.New("cannot convert timestamp field abc to timestamp with error unsupported type to convert to timestamp true"),
  779. },
  780. { // 3
  781. stmt: &ast.StreamStmt{
  782. Name: ast.StreamName("demo"),
  783. StreamFields: []ast.StreamField{
  784. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  785. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  786. },
  787. Options: &ast.Options{
  788. DATASOURCE: "users",
  789. TIMESTAMP: "def",
  790. },
  791. },
  792. data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
  793. result: &xsql.Tuple{
  794. Message: xsql.Message{
  795. "abc": float64(34),
  796. "def": "2019-09-23T02:47:29.754Z",
  797. "ghi": float64(50),
  798. }, Timestamp: int64(1569206849754),
  799. },
  800. },
  801. { // 4
  802. stmt: &ast.StreamStmt{
  803. Name: ast.StreamName("demo"),
  804. StreamFields: []ast.StreamField{
  805. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  806. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  807. },
  808. Options: &ast.Options{
  809. DATASOURCE: "users",
  810. TIMESTAMP: "abc",
  811. },
  812. },
  813. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  814. result: &xsql.Tuple{
  815. Message: xsql.Message{
  816. "abc": cast.TimeFromUnixMilli(1568854515000),
  817. "def": cast.TimeFromUnixMilli(1568854573431),
  818. }, Timestamp: int64(1568854515000),
  819. },
  820. },
  821. { // 5
  822. stmt: &ast.StreamStmt{
  823. Name: ast.StreamName("demo"),
  824. StreamFields: []ast.StreamField{
  825. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  826. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  827. },
  828. Options: &ast.Options{
  829. DATASOURCE: "users",
  830. TIMESTAMP: "def",
  831. TIMESTAMP_FORMAT: "yyyy-MM-dd'AT'HH:mm:ss",
  832. },
  833. },
  834. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  835. result: &xsql.Tuple{
  836. Message: xsql.Message{
  837. "abc": float64(34),
  838. "def": "2019-09-23AT02:47:29",
  839. "ghi": float64(50),
  840. }, Timestamp: int64(1569206849000),
  841. },
  842. },
  843. { // 6
  844. stmt: &ast.StreamStmt{
  845. Name: ast.StreamName("demo"),
  846. StreamFields: []ast.StreamField{
  847. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  848. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  849. },
  850. Options: &ast.Options{
  851. DATASOURCE: "users",
  852. TIMESTAMP: "def",
  853. TIMESTAMP_FORMAT: "yyyy-MM-ddaHH:mm:ss",
  854. },
  855. },
  856. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  857. result: errors.New("cannot convert timestamp field def to timestamp with error parsing time \"2019-09-23AT02:47:29\" as \"2006-01-02PM15:04:05\": cannot parse \"AT02:47:29\" as \"PM\""),
  858. },
  859. }
  860. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  861. defer conf.CloseLogger()
  862. contextLogger := conf.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
  863. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  864. for i, tt := range tests {
  865. pp := &Preprocessor{
  866. checkSchema: true,
  867. defaultFieldProcessor: defaultFieldProcessor{
  868. streamFields: tt.stmt.StreamFields.ToJsonSchema(),
  869. timestampFormat: tt.stmt.Options.TIMESTAMP_FORMAT,
  870. },
  871. isEventTime: true,
  872. timestampField: tt.stmt.Options.TIMESTAMP,
  873. }
  874. dm := make(map[string]interface{})
  875. if e := json.Unmarshal(tt.data, &dm); e != nil {
  876. log.Fatal(e)
  877. return
  878. } else {
  879. tuple := &xsql.Tuple{Message: dm}
  880. fv, afv := xsql.NewFunctionValuersForOp(nil)
  881. result := pp.Apply(ctx, tuple, fv, afv)
  882. // workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  883. if rt, ok := result.(*xsql.Tuple); ok {
  884. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  885. rt.Message["abc"] = rtt.UTC()
  886. }
  887. }
  888. if !reflect.DeepEqual(tt.result, result) {
  889. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  890. }
  891. }
  892. }
  893. }
  894. func TestPreprocessorError(t *testing.T) {
  895. tests := []struct {
  896. stmt *ast.StreamStmt
  897. data []byte
  898. result interface{}
  899. }{
  900. {
  901. stmt: &ast.StreamStmt{
  902. Name: ast.StreamName("demo"),
  903. StreamFields: []ast.StreamField{
  904. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  905. },
  906. },
  907. data: []byte(`{"abc": "dafsad"}`),
  908. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert string(dafsad) to int64"),
  909. }, {
  910. stmt: &ast.StreamStmt{
  911. Name: ast.StreamName("demo"),
  912. StreamFields: []ast.StreamField{
  913. {Name: "a", FieldType: &ast.RecType{
  914. StreamFields: []ast.StreamField{
  915. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  916. },
  917. }},
  918. },
  919. },
  920. data: []byte(`{"a": {"d" : "hello"}}`),
  921. result: errors.New("error in preprocessor: field a type mismatch: field b is not found"),
  922. }, {
  923. stmt: &ast.StreamStmt{
  924. Name: ast.StreamName("demo"),
  925. StreamFields: []ast.StreamField{
  926. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  927. },
  928. Options: &ast.Options{
  929. DATASOURCE: "users",
  930. FORMAT: "JSON",
  931. KEY: "USERID",
  932. CONF_KEY: "srv1",
  933. TYPE: "MQTT",
  934. TIMESTAMP: "abc",
  935. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  936. },
  937. },
  938. data: []byte(`{"abc": "not a time"}`),
  939. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert string(not a time) to int64"),
  940. },
  941. }
  942. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  943. defer conf.CloseLogger()
  944. contextLogger := conf.Log.WithField("rule", "TestPreprocessorError")
  945. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  946. for i, tt := range tests {
  947. pp := &Preprocessor{checkSchema: true}
  948. pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
  949. dm := make(map[string]interface{})
  950. if e := json.Unmarshal(tt.data, &dm); e != nil {
  951. log.Fatal(e)
  952. return
  953. } else {
  954. tuple := &xsql.Tuple{Message: dm}
  955. fv, afv := xsql.NewFunctionValuersForOp(nil)
  956. result := pp.Apply(ctx, tuple, fv, afv)
  957. if !reflect.DeepEqual(tt.result, result) {
  958. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  959. }
  960. }
  961. }
  962. }
  963. func TestPreprocessorForBinary(t *testing.T) {
  964. docsFolder, err := conf.GetLoc("docs/")
  965. if err != nil {
  966. t.Errorf("Cannot find docs folder: %v", err)
  967. }
  968. image, err := os.ReadFile(path.Join(docsFolder, "cover.jpg"))
  969. if err != nil {
  970. t.Errorf("Cannot read image: %v", err)
  971. }
  972. b64img := base64.StdEncoding.EncodeToString(image)
  973. tests := []struct {
  974. stmt *ast.StreamStmt
  975. data []byte
  976. result interface{}
  977. }{
  978. { // 0
  979. stmt: &ast.StreamStmt{
  980. Name: ast.StreamName("demo"),
  981. StreamFields: []ast.StreamField{
  982. {Name: "a", FieldType: &ast.RecType{
  983. StreamFields: []ast.StreamField{
  984. {Name: "b", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  985. },
  986. }},
  987. },
  988. },
  989. data: []byte(fmt.Sprintf(`{"a": {"b" : "%s"}}`, b64img)),
  990. result: &xsql.Tuple{
  991. Message: xsql.Message{
  992. "a": map[string]interface{}{
  993. "b": image,
  994. },
  995. },
  996. },
  997. },
  998. { // 1
  999. stmt: &ast.StreamStmt{
  1000. Name: ast.StreamName("demo"),
  1001. StreamFields: []ast.StreamField{
  1002. {Name: "a", FieldType: &ast.ArrayType{
  1003. Type: ast.BYTEA,
  1004. }},
  1005. },
  1006. },
  1007. data: []byte(fmt.Sprintf(`{"a": ["%s"]}`, b64img)),
  1008. result: &xsql.Tuple{
  1009. Message: xsql.Message{
  1010. "a": []interface{}{
  1011. image,
  1012. },
  1013. },
  1014. },
  1015. },
  1016. {
  1017. stmt: &ast.StreamStmt{
  1018. Name: ast.StreamName("demo"),
  1019. StreamFields: []ast.StreamField{
  1020. {Name: "a", FieldType: &ast.ArrayType{
  1021. Type: ast.STRUCT,
  1022. FieldType: &ast.RecType{
  1023. StreamFields: []ast.StreamField{
  1024. {Name: "b", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  1025. },
  1026. },
  1027. }},
  1028. },
  1029. },
  1030. data: []byte(fmt.Sprintf(`{"a": [{"b":"%s"}]}`, b64img)),
  1031. result: &xsql.Tuple{
  1032. Message: xsql.Message{
  1033. "a": []interface{}{
  1034. map[string]interface{}{"b": image},
  1035. },
  1036. },
  1037. },
  1038. },
  1039. }
  1040. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1041. defer conf.CloseLogger()
  1042. contextLogger := conf.Log.WithField("rule", "TestPreprocessorForBinary")
  1043. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1044. for i, tt := range tests {
  1045. pp := &Preprocessor{checkSchema: true}
  1046. pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
  1047. format := message.FormatJson
  1048. ccc, _ := converter.GetOrCreateConverter(&ast.Options{FORMAT: format})
  1049. nCtx := context.WithValue(ctx, context.DecodeKey, ccc)
  1050. if dm, e := nCtx.Decode(tt.data); e != nil {
  1051. log.Fatal(e)
  1052. return
  1053. } else {
  1054. tuple := &xsql.Tuple{Message: dm}
  1055. fv, afv := xsql.NewFunctionValuersForOp(nil)
  1056. result := pp.Apply(ctx, tuple, fv, afv)
  1057. if !reflect.DeepEqual(tt.result, result) {
  1058. t.Errorf("%d. %q\n\nresult mismatch", i, tuple)
  1059. }
  1060. }
  1061. }
  1062. }