preprocessor_test.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "encoding/base64"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "log"
  21. "os"
  22. "path"
  23. "reflect"
  24. "testing"
  25. "time"
  26. "github.com/stretchr/testify/assert"
  27. "github.com/lf-edge/ekuiper/internal/conf"
  28. "github.com/lf-edge/ekuiper/internal/converter"
  29. "github.com/lf-edge/ekuiper/internal/topo/context"
  30. "github.com/lf-edge/ekuiper/internal/xsql"
  31. "github.com/lf-edge/ekuiper/pkg/ast"
  32. "github.com/lf-edge/ekuiper/pkg/cast"
  33. "github.com/lf-edge/ekuiper/pkg/message"
  34. )
  35. func TestPreprocessor_Apply(t *testing.T) {
  36. tests := []struct {
  37. stmt *ast.StreamStmt
  38. data []byte
  39. result interface{}
  40. }{
  41. // Basic type
  42. { // 0
  43. stmt: &ast.StreamStmt{
  44. Name: ast.StreamName("demo"),
  45. StreamFields: []ast.StreamField{
  46. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  47. },
  48. },
  49. data: []byte(`{"a": 6}`),
  50. result: errors.New("error in preprocessor: field abc is not found"),
  51. },
  52. { // 1
  53. stmt: &ast.StreamStmt{
  54. Name: ast.StreamName("demo"),
  55. StreamFields: []ast.StreamField{
  56. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  57. },
  58. },
  59. data: []byte(`{"abc": null}`),
  60. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert <nil>(<nil>) to int64"),
  61. },
  62. { // 2
  63. stmt: &ast.StreamStmt{
  64. Name: ast.StreamName("demo"),
  65. StreamFields: nil,
  66. },
  67. data: []byte(`{"a": 6}`),
  68. result: &xsql.Tuple{
  69. Message: xsql.Message{
  70. "a": float64(6),
  71. },
  72. },
  73. },
  74. { // 3
  75. stmt: &ast.StreamStmt{
  76. Name: ast.StreamName("demo"),
  77. StreamFields: []ast.StreamField{
  78. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  79. },
  80. },
  81. data: []byte(`{"abc": 6}`),
  82. result: &xsql.Tuple{
  83. Message: xsql.Message{
  84. "abc": int64(6),
  85. },
  86. },
  87. },
  88. { // 4
  89. stmt: &ast.StreamStmt{
  90. Name: ast.StreamName("demo"),
  91. StreamFields: nil,
  92. },
  93. data: []byte(`{"abc": 6}`),
  94. result: &xsql.Tuple{
  95. Message: xsql.Message{
  96. "abc": float64(6),
  97. },
  98. },
  99. },
  100. { // 5
  101. stmt: &ast.StreamStmt{
  102. Name: ast.StreamName("demo"),
  103. StreamFields: []ast.StreamField{
  104. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  105. {Name: "dEf", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  106. },
  107. },
  108. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  109. result: &xsql.Tuple{
  110. Message: xsql.Message{
  111. "abc": float64(34),
  112. "dEf": "hello",
  113. "def": "hello",
  114. "ghi": float64(50),
  115. },
  116. },
  117. },
  118. { // 6
  119. stmt: &ast.StreamStmt{
  120. Name: ast.StreamName("demo"),
  121. StreamFields: nil,
  122. },
  123. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  124. result: &xsql.Tuple{
  125. Message: xsql.Message{
  126. "abc": float64(34),
  127. "def": "hello",
  128. "ghi": float64(50),
  129. },
  130. },
  131. },
  132. { // 7
  133. stmt: &ast.StreamStmt{
  134. Name: ast.StreamName("demo"),
  135. StreamFields: []ast.StreamField{
  136. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  137. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  138. },
  139. },
  140. data: []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
  141. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert string(34) to float64"),
  142. },
  143. { // 8
  144. stmt: &ast.StreamStmt{
  145. Name: ast.StreamName("demo"),
  146. StreamFields: []ast.StreamField{
  147. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  148. {Name: "def", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  149. },
  150. },
  151. data: []byte(`{"abc": 77, "def" : "hello"}`),
  152. result: errors.New("error in preprocessor: field def type mismatch: cannot convert string(hello) to bool"),
  153. },
  154. { // 9
  155. stmt: &ast.StreamStmt{
  156. Name: ast.StreamName("demo"),
  157. StreamFields: []ast.StreamField{
  158. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  159. },
  160. },
  161. data: []byte(`{"a": {"b" : "hello"}}`),
  162. result: errors.New("error in preprocessor: field abc is not found"),
  163. },
  164. { // 10
  165. stmt: &ast.StreamStmt{
  166. Name: ast.StreamName("demo"),
  167. StreamFields: nil,
  168. },
  169. data: []byte(`{"a": {"b" : "hello"}}`),
  170. result: &xsql.Tuple{
  171. Message: xsql.Message{
  172. "a": map[string]interface{}{
  173. "b": "hello",
  174. },
  175. },
  176. },
  177. },
  178. // Rec type
  179. { // 11
  180. stmt: &ast.StreamStmt{
  181. Name: ast.StreamName("demo"),
  182. StreamFields: []ast.StreamField{
  183. {Name: "a", FieldType: &ast.RecType{
  184. StreamFields: []ast.StreamField{
  185. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  186. },
  187. }},
  188. },
  189. },
  190. data: []byte(`{"a": {"b" : "hello"}}`),
  191. result: &xsql.Tuple{
  192. Message: xsql.Message{
  193. "a": map[string]interface{}{
  194. "b": "hello",
  195. },
  196. },
  197. },
  198. },
  199. { // 12
  200. stmt: &ast.StreamStmt{
  201. Name: ast.StreamName("demo"),
  202. StreamFields: []ast.StreamField{
  203. {Name: "a", FieldType: &ast.RecType{
  204. StreamFields: []ast.StreamField{
  205. {Name: "b", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  206. },
  207. }},
  208. },
  209. },
  210. data: []byte(`{"a": "{\"b\" : \"32\"}"}`),
  211. result: errors.New("error in preprocessor: field a type mismatch: field b type mismatch: cannot convert string(32) to float64"),
  212. },
  213. { // 13
  214. stmt: &ast.StreamStmt{
  215. Name: ast.StreamName("demo"),
  216. StreamFields: nil,
  217. },
  218. data: []byte(`{"a": {"b" : "32"}}`),
  219. result: &xsql.Tuple{
  220. Message: xsql.Message{
  221. "a": map[string]interface{}{
  222. "b": "32",
  223. },
  224. },
  225. },
  226. },
  227. // Array of complex type
  228. { // 14
  229. stmt: &ast.StreamStmt{
  230. Name: ast.StreamName("demo"),
  231. StreamFields: []ast.StreamField{
  232. {Name: "a", FieldType: &ast.ArrayType{
  233. Type: ast.STRUCT,
  234. FieldType: &ast.RecType{
  235. StreamFields: []ast.StreamField{
  236. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  237. },
  238. },
  239. }},
  240. },
  241. },
  242. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  243. result: &xsql.Tuple{
  244. Message: xsql.Message{
  245. "a": []interface{}{
  246. map[string]interface{}{"b": "hello1"},
  247. map[string]interface{}{"b": "hello2"},
  248. },
  249. },
  250. },
  251. },
  252. { // 15
  253. stmt: &ast.StreamStmt{
  254. Name: ast.StreamName("demo"),
  255. StreamFields: []ast.StreamField{
  256. {Name: "a", FieldType: &ast.ArrayType{
  257. Type: ast.STRUCT,
  258. FieldType: &ast.RecType{
  259. StreamFields: []ast.StreamField{
  260. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  261. },
  262. },
  263. }},
  264. },
  265. },
  266. data: []byte(`{"a": []}`),
  267. result: &xsql.Tuple{
  268. Message: xsql.Message{
  269. "a": make([]interface{}, 0),
  270. },
  271. },
  272. },
  273. { // 16
  274. stmt: &ast.StreamStmt{
  275. Name: ast.StreamName("demo"),
  276. StreamFields: []ast.StreamField{
  277. {Name: "a", FieldType: &ast.ArrayType{
  278. Type: ast.STRUCT,
  279. FieldType: &ast.RecType{
  280. StreamFields: []ast.StreamField{
  281. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  282. },
  283. },
  284. }},
  285. },
  286. },
  287. data: []byte(`{"a": null}`),
  288. result: &xsql.Tuple{
  289. Message: xsql.Message{
  290. "a": []interface{}(nil),
  291. },
  292. },
  293. },
  294. { // 17
  295. stmt: &ast.StreamStmt{
  296. Name: ast.StreamName("demo"),
  297. StreamFields: []ast.StreamField{
  298. {Name: "a", FieldType: &ast.ArrayType{
  299. Type: ast.STRUCT,
  300. FieldType: &ast.RecType{
  301. StreamFields: []ast.StreamField{
  302. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  303. },
  304. },
  305. }},
  306. },
  307. },
  308. data: []byte(`{"a": [null, {"b" : "hello2"}]}`),
  309. result: &xsql.Tuple{
  310. Message: xsql.Message{
  311. "a": []interface{}{
  312. map[string]interface{}(nil),
  313. map[string]interface{}{"b": "hello2"},
  314. },
  315. },
  316. },
  317. },
  318. { // 18
  319. stmt: &ast.StreamStmt{
  320. Name: ast.StreamName("demo"),
  321. StreamFields: []ast.StreamField{
  322. {Name: "a", FieldType: &ast.ArrayType{
  323. Type: ast.ARRAY,
  324. FieldType: &ast.ArrayType{
  325. Type: ast.BIGINT,
  326. },
  327. }},
  328. },
  329. },
  330. data: []byte(`{"a": [[50, 60, 70],[66], [77]]}`),
  331. result: &xsql.Tuple{
  332. Message: xsql.Message{
  333. "a": []interface{}{
  334. []interface{}{int64(50), int64(60), int64(70)},
  335. []interface{}{int64(66)},
  336. []interface{}{int64(77)},
  337. },
  338. },
  339. },
  340. },
  341. { // 19
  342. stmt: &ast.StreamStmt{
  343. Name: ast.StreamName("demo"),
  344. StreamFields: []ast.StreamField{
  345. {Name: "a", FieldType: &ast.ArrayType{
  346. Type: ast.ARRAY,
  347. FieldType: &ast.ArrayType{
  348. Type: ast.BIGINT,
  349. },
  350. }},
  351. },
  352. },
  353. data: []byte(`{"a": [null, [66], [77]]}`),
  354. result: &xsql.Tuple{
  355. Message: xsql.Message{
  356. "a": []interface{}{
  357. []interface{}(nil),
  358. []interface{}{int64(66)},
  359. []interface{}{int64(77)},
  360. },
  361. },
  362. },
  363. },
  364. { // 20
  365. stmt: &ast.StreamStmt{
  366. Name: ast.StreamName("demo"),
  367. StreamFields: nil,
  368. },
  369. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  370. result: &xsql.Tuple{
  371. Message: xsql.Message{
  372. "a": []interface{}{
  373. map[string]interface{}{"b": "hello1"},
  374. map[string]interface{}{"b": "hello2"},
  375. },
  376. },
  377. },
  378. },
  379. { // 21
  380. stmt: &ast.StreamStmt{
  381. Name: ast.StreamName("demo"),
  382. StreamFields: []ast.StreamField{
  383. {Name: "a", FieldType: &ast.ArrayType{
  384. Type: ast.FLOAT,
  385. }},
  386. },
  387. },
  388. data: []byte(`{"a": "[\"55\", \"77\"]"}`),
  389. result: errors.New("error in preprocessor: field a type mismatch: expect array but got [\"55\", \"77\"]"),
  390. },
  391. { // 22
  392. stmt: &ast.StreamStmt{
  393. Name: ast.StreamName("demo"),
  394. StreamFields: nil,
  395. },
  396. data: []byte(`{"a": [55, 77]}`),
  397. result: &xsql.Tuple{
  398. Message: xsql.Message{
  399. "a": []interface{}{
  400. float64(55),
  401. float64(77),
  402. },
  403. },
  404. },
  405. },
  406. // Rec of complex type
  407. { // 23
  408. stmt: &ast.StreamStmt{
  409. Name: ast.StreamName("demo"),
  410. StreamFields: []ast.StreamField{
  411. {Name: "a", FieldType: &ast.RecType{
  412. StreamFields: []ast.StreamField{
  413. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  414. {Name: "c", FieldType: &ast.RecType{
  415. StreamFields: []ast.StreamField{
  416. {Name: "d", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  417. },
  418. }},
  419. },
  420. }},
  421. },
  422. },
  423. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  424. result: &xsql.Tuple{
  425. Message: xsql.Message{
  426. "a": map[string]interface{}{
  427. "b": "hello",
  428. "c": map[string]interface{}{
  429. "d": int64(35),
  430. },
  431. },
  432. },
  433. },
  434. },
  435. { // 24
  436. stmt: &ast.StreamStmt{
  437. Name: ast.StreamName("demo"),
  438. StreamFields: []ast.StreamField{
  439. {Name: "a", FieldType: &ast.RecType{
  440. StreamFields: []ast.StreamField{
  441. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  442. {Name: "c", FieldType: &ast.RecType{
  443. StreamFields: []ast.StreamField{
  444. {Name: "d", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  445. },
  446. }},
  447. },
  448. }},
  449. },
  450. },
  451. data: []byte(`{"a": null}`),
  452. result: &xsql.Tuple{
  453. Message: xsql.Message{
  454. "a": map[string]interface{}(nil),
  455. },
  456. },
  457. },
  458. { // 25
  459. stmt: &ast.StreamStmt{
  460. Name: ast.StreamName("demo"),
  461. StreamFields: []ast.StreamField{
  462. {Name: "a", FieldType: &ast.RecType{
  463. StreamFields: []ast.StreamField{
  464. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  465. {Name: "c", FieldType: &ast.ArrayType{
  466. Type: ast.FLOAT,
  467. }},
  468. },
  469. }},
  470. },
  471. },
  472. data: []byte(`{"a": {"b" : "hello", "c": [35.2, 38.2]}}`),
  473. result: &xsql.Tuple{
  474. Message: xsql.Message{
  475. "a": map[string]interface{}{
  476. "b": "hello",
  477. "c": []interface{}{
  478. 35.2, 38.2,
  479. },
  480. },
  481. },
  482. },
  483. },
  484. { // 26
  485. stmt: &ast.StreamStmt{
  486. Name: ast.StreamName("demo"),
  487. StreamFields: []ast.StreamField{
  488. {Name: "a", FieldType: &ast.RecType{
  489. StreamFields: []ast.StreamField{
  490. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  491. {Name: "c", FieldType: &ast.ArrayType{
  492. Type: ast.FLOAT,
  493. }},
  494. },
  495. }},
  496. },
  497. },
  498. data: []byte(`{"a": {"b" : "hello", "c": null}}`),
  499. result: &xsql.Tuple{
  500. Message: xsql.Message{
  501. "a": map[string]interface{}{
  502. "b": "hello",
  503. "c": []interface{}(nil),
  504. },
  505. },
  506. },
  507. },
  508. { // 27
  509. stmt: &ast.StreamStmt{
  510. Name: ast.StreamName("demo"),
  511. StreamFields: []ast.StreamField{
  512. {Name: "a", FieldType: &ast.RecType{
  513. StreamFields: []ast.StreamField{
  514. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  515. {Name: "c", FieldType: &ast.ArrayType{
  516. Type: ast.FLOAT,
  517. }},
  518. },
  519. }},
  520. },
  521. },
  522. data: []byte(`{"a": {"b" : "hello", "c": [null, 35.4]}}`),
  523. result: errors.New("error in preprocessor: field a type mismatch: field c type mismatch: array element type mismatch: cannot convert <nil>(<nil>) to float64"),
  524. },
  525. { // 28
  526. stmt: &ast.StreamStmt{
  527. Name: ast.StreamName("demo"),
  528. StreamFields: nil,
  529. },
  530. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  531. result: &xsql.Tuple{
  532. Message: xsql.Message{
  533. "a": map[string]interface{}{
  534. "b": "hello",
  535. "c": map[string]interface{}{
  536. "d": 35.2,
  537. },
  538. },
  539. },
  540. },
  541. },
  542. }
  543. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  544. defer conf.CloseLogger()
  545. contextLogger := conf.Log.WithField("rule", "TestPreprocessor_Apply")
  546. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  547. for i, tt := range tests {
  548. pp := &Preprocessor{checkSchema: true}
  549. pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
  550. dm := make(map[string]interface{})
  551. if e := json.Unmarshal(tt.data, &dm); e != nil {
  552. log.Fatal(e)
  553. return
  554. } else {
  555. tuple := &xsql.Tuple{Message: dm}
  556. fv, afv := xsql.NewFunctionValuersForOp(nil)
  557. result := pp.Apply(ctx, tuple, fv, afv)
  558. if !reflect.DeepEqual(tt.result, result) {
  559. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  560. }
  561. }
  562. }
  563. }
  564. func TestPreprocessorTime_Apply(t *testing.T) {
  565. tests := []struct {
  566. stmt *ast.StreamStmt
  567. data []byte
  568. result interface{}
  569. }{
  570. { // 0
  571. stmt: &ast.StreamStmt{
  572. Name: ast.StreamName("demo"),
  573. StreamFields: []ast.StreamField{
  574. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  575. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  576. },
  577. },
  578. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  579. result: &xsql.Tuple{
  580. Message: xsql.Message{
  581. "abc": cast.TimeFromUnixMilli(1568854515000),
  582. "def": cast.TimeFromUnixMilli(1568854573431),
  583. },
  584. },
  585. },
  586. { // 1
  587. stmt: &ast.StreamStmt{
  588. Name: ast.StreamName("demo"),
  589. StreamFields: nil,
  590. },
  591. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  592. result: &xsql.Tuple{
  593. Message: xsql.Message{
  594. "abc": "2019-09-19T00:55:15.000Z",
  595. "def": float64(1568854573431),
  596. },
  597. },
  598. },
  599. { // 2
  600. stmt: &ast.StreamStmt{
  601. Name: ast.StreamName("demo"),
  602. StreamFields: []ast.StreamField{
  603. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  604. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  605. },
  606. },
  607. data: []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
  608. result: errors.New("error in preprocessor: field abc type mismatch: Can't parse string as time: 2019-09-19T00:55:1dd5Z"),
  609. },
  610. { // 3
  611. stmt: &ast.StreamStmt{
  612. Name: ast.StreamName("demo"),
  613. StreamFields: []ast.StreamField{
  614. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  615. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  616. },
  617. Options: &ast.Options{
  618. DATASOURCE: "users",
  619. FORMAT: "JSON",
  620. KEY: "USERID",
  621. CONF_KEY: "srv1",
  622. TYPE: "MQTT",
  623. TIMESTAMP: "USERID",
  624. TIMESTAMP_FORMAT: "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
  625. },
  626. },
  627. data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
  628. result: &xsql.Tuple{Message: xsql.Message{
  629. "abc": cast.TimeFromUnixMilli(1568894115000),
  630. "def": cast.TimeFromUnixMilli(1568854573431),
  631. }},
  632. },
  633. // Array type
  634. { // 4
  635. stmt: &ast.StreamStmt{
  636. Name: ast.StreamName("demo"),
  637. StreamFields: []ast.StreamField{
  638. {Name: "a", FieldType: &ast.ArrayType{
  639. Type: ast.DATETIME,
  640. }},
  641. },
  642. },
  643. data: []byte(`{"a": [1568854515123, 1568854573431]}`),
  644. result: &xsql.Tuple{
  645. Message: xsql.Message{
  646. "a": []interface{}{
  647. cast.TimeFromUnixMilli(1568854515123),
  648. cast.TimeFromUnixMilli(1568854573431),
  649. },
  650. },
  651. },
  652. },
  653. { // 5
  654. stmt: &ast.StreamStmt{
  655. Name: ast.StreamName("demo"),
  656. StreamFields: []ast.StreamField{
  657. {Name: "a", FieldType: &ast.RecType{
  658. StreamFields: []ast.StreamField{
  659. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  660. {Name: "c", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  661. },
  662. }},
  663. },
  664. },
  665. data: []byte(`{"a": {"b" : "hello", "c": 1568854515000}}`),
  666. result: &xsql.Tuple{
  667. Message: xsql.Message{
  668. "a": map[string]interface{}{
  669. "b": "hello",
  670. "c": cast.TimeFromUnixMilli(1568854515000),
  671. },
  672. },
  673. },
  674. },
  675. }
  676. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  677. defer conf.CloseLogger()
  678. contextLogger := conf.Log.WithField("rule", "TestPreprocessorTime_Apply")
  679. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  680. for i, tt := range tests {
  681. timestampFormat := ""
  682. if tt.stmt.Options != nil {
  683. timestampFormat = tt.stmt.Options.TIMESTAMP_FORMAT
  684. }
  685. pp, e := NewPreprocessor(false, tt.stmt.StreamFields.ToJsonSchema(), false, nil, false, "", timestampFormat, false, true)
  686. assert.NoError(t, e)
  687. dm := make(map[string]interface{})
  688. if e := json.Unmarshal(tt.data, &dm); e != nil {
  689. log.Fatal(e)
  690. return
  691. } else {
  692. tuple := &xsql.Tuple{Message: dm}
  693. fv, afv := xsql.NewFunctionValuersForOp(nil)
  694. result := pp.Apply(ctx, tuple, fv, afv)
  695. // workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  696. if rt, ok := result.(*xsql.Tuple); ok {
  697. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  698. rt.Message["abc"] = rtt.UTC()
  699. }
  700. }
  701. if !reflect.DeepEqual(tt.result, result) {
  702. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  703. }
  704. }
  705. }
  706. }
  707. func convertFields(o ast.StreamFields) []interface{} {
  708. if o == nil {
  709. return nil
  710. }
  711. fields := make([]interface{}, len(o))
  712. for i := range o {
  713. fields[i] = &o[i]
  714. }
  715. return fields
  716. }
  717. func TestPreprocessorEventtime_Apply(t *testing.T) {
  718. tests := []struct {
  719. stmt *ast.StreamStmt
  720. data []byte
  721. result interface{}
  722. }{
  723. // Basic type
  724. { // 0
  725. stmt: &ast.StreamStmt{
  726. Name: ast.StreamName("demo"),
  727. StreamFields: []ast.StreamField{
  728. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  729. },
  730. Options: &ast.Options{
  731. DATASOURCE: "users",
  732. FORMAT: "JSON",
  733. KEY: "USERID",
  734. CONF_KEY: "srv1",
  735. TYPE: "MQTT",
  736. TIMESTAMP: "abc",
  737. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  738. },
  739. },
  740. data: []byte(`{"abc": 1568854515000}`),
  741. result: &xsql.Tuple{
  742. Message: xsql.Message{
  743. "abc": int64(1568854515000),
  744. }, Timestamp: 1568854515000,
  745. },
  746. },
  747. { // 1
  748. stmt: &ast.StreamStmt{
  749. Name: ast.StreamName("demo"),
  750. StreamFields: nil,
  751. Options: &ast.Options{
  752. DATASOURCE: "users",
  753. FORMAT: "JSON",
  754. KEY: "USERID",
  755. CONF_KEY: "srv1",
  756. TYPE: "MQTT",
  757. TIMESTAMP: "abc",
  758. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  759. },
  760. },
  761. data: []byte(`{"abc": 1568854515000}`),
  762. result: &xsql.Tuple{
  763. Message: xsql.Message{
  764. "abc": float64(1568854515000),
  765. }, Timestamp: 1568854515000,
  766. },
  767. },
  768. { // 2
  769. stmt: &ast.StreamStmt{
  770. Name: ast.StreamName("demo"),
  771. StreamFields: []ast.StreamField{
  772. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  773. },
  774. Options: &ast.Options{
  775. DATASOURCE: "users",
  776. TIMESTAMP: "abc",
  777. },
  778. },
  779. data: []byte(`{"abc": true}`),
  780. result: errors.New("cannot convert timestamp field abc to timestamp with error unsupported type to convert to timestamp true"),
  781. },
  782. { // 3
  783. stmt: &ast.StreamStmt{
  784. Name: ast.StreamName("demo"),
  785. StreamFields: []ast.StreamField{
  786. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  787. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  788. },
  789. Options: &ast.Options{
  790. DATASOURCE: "users",
  791. TIMESTAMP: "def",
  792. },
  793. },
  794. data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
  795. result: &xsql.Tuple{
  796. Message: xsql.Message{
  797. "abc": float64(34),
  798. "def": "2019-09-23T02:47:29.754Z",
  799. "ghi": float64(50),
  800. }, Timestamp: int64(1569206849754),
  801. },
  802. },
  803. { // 4
  804. stmt: &ast.StreamStmt{
  805. Name: ast.StreamName("demo"),
  806. StreamFields: []ast.StreamField{
  807. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  808. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  809. },
  810. Options: &ast.Options{
  811. DATASOURCE: "users",
  812. TIMESTAMP: "abc",
  813. },
  814. },
  815. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  816. result: &xsql.Tuple{
  817. Message: xsql.Message{
  818. "abc": cast.TimeFromUnixMilli(1568854515000),
  819. "def": cast.TimeFromUnixMilli(1568854573431),
  820. }, Timestamp: int64(1568854515000),
  821. },
  822. },
  823. { // 5
  824. stmt: &ast.StreamStmt{
  825. Name: ast.StreamName("demo"),
  826. StreamFields: []ast.StreamField{
  827. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  828. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  829. },
  830. Options: &ast.Options{
  831. DATASOURCE: "users",
  832. TIMESTAMP: "def",
  833. TIMESTAMP_FORMAT: "yyyy-MM-dd'AT'HH:mm:ss",
  834. },
  835. },
  836. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  837. result: &xsql.Tuple{
  838. Message: xsql.Message{
  839. "abc": float64(34),
  840. "def": "2019-09-23AT02:47:29",
  841. "ghi": float64(50),
  842. }, Timestamp: int64(1569206849000),
  843. },
  844. },
  845. { // 6
  846. stmt: &ast.StreamStmt{
  847. Name: ast.StreamName("demo"),
  848. StreamFields: []ast.StreamField{
  849. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  850. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  851. },
  852. Options: &ast.Options{
  853. DATASOURCE: "users",
  854. TIMESTAMP: "def",
  855. TIMESTAMP_FORMAT: "yyyy-MM-ddaHH:mm:ss",
  856. },
  857. },
  858. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  859. result: errors.New("cannot convert timestamp field def to timestamp with error Can't parse string as time: 2019-09-23AT02:47:29"),
  860. },
  861. }
  862. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  863. defer conf.CloseLogger()
  864. contextLogger := conf.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
  865. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  866. for i, tt := range tests {
  867. pp := &Preprocessor{
  868. checkSchema: true,
  869. defaultFieldProcessor: defaultFieldProcessor{
  870. streamFields: tt.stmt.StreamFields.ToJsonSchema(),
  871. timestampFormat: tt.stmt.Options.TIMESTAMP_FORMAT,
  872. },
  873. isEventTime: true,
  874. timestampField: tt.stmt.Options.TIMESTAMP,
  875. }
  876. dm := make(map[string]interface{})
  877. if e := json.Unmarshal(tt.data, &dm); e != nil {
  878. log.Fatal(e)
  879. return
  880. } else {
  881. tuple := &xsql.Tuple{Message: dm}
  882. fv, afv := xsql.NewFunctionValuersForOp(nil)
  883. result := pp.Apply(ctx, tuple, fv, afv)
  884. // workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  885. if rt, ok := result.(*xsql.Tuple); ok {
  886. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  887. rt.Message["abc"] = rtt.UTC()
  888. }
  889. }
  890. if !reflect.DeepEqual(tt.result, result) {
  891. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  892. }
  893. }
  894. }
  895. }
  896. func TestPreprocessorError(t *testing.T) {
  897. tests := []struct {
  898. stmt *ast.StreamStmt
  899. data []byte
  900. result interface{}
  901. }{
  902. {
  903. stmt: &ast.StreamStmt{
  904. Name: ast.StreamName("demo"),
  905. StreamFields: []ast.StreamField{
  906. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  907. },
  908. },
  909. data: []byte(`{"abc": "dafsad"}`),
  910. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert string(dafsad) to int64"),
  911. }, {
  912. stmt: &ast.StreamStmt{
  913. Name: ast.StreamName("demo"),
  914. StreamFields: []ast.StreamField{
  915. {Name: "a", FieldType: &ast.RecType{
  916. StreamFields: []ast.StreamField{
  917. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  918. },
  919. }},
  920. },
  921. },
  922. data: []byte(`{"a": {"d" : "hello"}}`),
  923. result: errors.New("error in preprocessor: field a type mismatch: field b is not found"),
  924. }, {
  925. stmt: &ast.StreamStmt{
  926. Name: ast.StreamName("demo"),
  927. StreamFields: []ast.StreamField{
  928. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  929. },
  930. Options: &ast.Options{
  931. DATASOURCE: "users",
  932. FORMAT: "JSON",
  933. KEY: "USERID",
  934. CONF_KEY: "srv1",
  935. TYPE: "MQTT",
  936. TIMESTAMP: "abc",
  937. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  938. },
  939. },
  940. data: []byte(`{"abc": "not a time"}`),
  941. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert string(not a time) to int64"),
  942. },
  943. }
  944. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  945. defer conf.CloseLogger()
  946. contextLogger := conf.Log.WithField("rule", "TestPreprocessorError")
  947. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  948. for i, tt := range tests {
  949. pp := &Preprocessor{checkSchema: true}
  950. pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
  951. dm := make(map[string]interface{})
  952. if e := json.Unmarshal(tt.data, &dm); e != nil {
  953. log.Fatal(e)
  954. return
  955. } else {
  956. tuple := &xsql.Tuple{Message: dm}
  957. fv, afv := xsql.NewFunctionValuersForOp(nil)
  958. result := pp.Apply(ctx, tuple, fv, afv)
  959. if !reflect.DeepEqual(tt.result, result) {
  960. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  961. }
  962. }
  963. }
  964. }
  965. func TestPreprocessorForBinary(t *testing.T) {
  966. docsFolder, err := conf.GetLoc("docs/")
  967. if err != nil {
  968. t.Errorf("Cannot find docs folder: %v", err)
  969. }
  970. image, err := os.ReadFile(path.Join(docsFolder, "cover.jpg"))
  971. if err != nil {
  972. t.Errorf("Cannot read image: %v", err)
  973. }
  974. b64img := base64.StdEncoding.EncodeToString(image)
  975. tests := []struct {
  976. stmt *ast.StreamStmt
  977. data []byte
  978. result interface{}
  979. }{
  980. { // 0
  981. stmt: &ast.StreamStmt{
  982. Name: ast.StreamName("demo"),
  983. StreamFields: []ast.StreamField{
  984. {Name: "a", FieldType: &ast.RecType{
  985. StreamFields: []ast.StreamField{
  986. {Name: "b", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  987. },
  988. }},
  989. },
  990. },
  991. data: []byte(fmt.Sprintf(`{"a": {"b" : "%s"}}`, b64img)),
  992. result: &xsql.Tuple{
  993. Message: xsql.Message{
  994. "a": map[string]interface{}{
  995. "b": image,
  996. },
  997. },
  998. },
  999. },
  1000. { // 1
  1001. stmt: &ast.StreamStmt{
  1002. Name: ast.StreamName("demo"),
  1003. StreamFields: []ast.StreamField{
  1004. {Name: "a", FieldType: &ast.ArrayType{
  1005. Type: ast.BYTEA,
  1006. }},
  1007. },
  1008. },
  1009. data: []byte(fmt.Sprintf(`{"a": ["%s"]}`, b64img)),
  1010. result: &xsql.Tuple{
  1011. Message: xsql.Message{
  1012. "a": []interface{}{
  1013. image,
  1014. },
  1015. },
  1016. },
  1017. },
  1018. {
  1019. stmt: &ast.StreamStmt{
  1020. Name: ast.StreamName("demo"),
  1021. StreamFields: []ast.StreamField{
  1022. {Name: "a", FieldType: &ast.ArrayType{
  1023. Type: ast.STRUCT,
  1024. FieldType: &ast.RecType{
  1025. StreamFields: []ast.StreamField{
  1026. {Name: "b", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  1027. },
  1028. },
  1029. }},
  1030. },
  1031. },
  1032. data: []byte(fmt.Sprintf(`{"a": [{"b":"%s"}]}`, b64img)),
  1033. result: &xsql.Tuple{
  1034. Message: xsql.Message{
  1035. "a": []interface{}{
  1036. map[string]interface{}{"b": image},
  1037. },
  1038. },
  1039. },
  1040. },
  1041. }
  1042. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1043. defer conf.CloseLogger()
  1044. contextLogger := conf.Log.WithField("rule", "TestPreprocessorForBinary")
  1045. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1046. for i, tt := range tests {
  1047. pp := &Preprocessor{checkSchema: true}
  1048. pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
  1049. format := message.FormatJson
  1050. ccc, _ := converter.GetOrCreateConverter(&ast.Options{FORMAT: format})
  1051. nCtx := context.WithValue(ctx, context.DecodeKey, ccc)
  1052. if dm, e := nCtx.Decode(tt.data); e != nil {
  1053. log.Fatal(e)
  1054. return
  1055. } else {
  1056. tuple := &xsql.Tuple{Message: dm}
  1057. fv, afv := xsql.NewFunctionValuersForOp(nil)
  1058. result := pp.Apply(ctx, tuple, fv, afv)
  1059. if !reflect.DeepEqual(tt.result, result) {
  1060. t.Errorf("%d. %q\n\nresult mismatch", i, tuple)
  1061. }
  1062. }
  1063. }
  1064. }