preprocessor_test.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "encoding/base64"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "log"
  21. "os"
  22. "path"
  23. "reflect"
  24. "testing"
  25. "time"
  26. "github.com/lf-edge/ekuiper/internal/conf"
  27. "github.com/lf-edge/ekuiper/internal/converter"
  28. "github.com/lf-edge/ekuiper/internal/topo/context"
  29. "github.com/lf-edge/ekuiper/internal/xsql"
  30. "github.com/lf-edge/ekuiper/pkg/ast"
  31. "github.com/lf-edge/ekuiper/pkg/cast"
  32. "github.com/lf-edge/ekuiper/pkg/message"
  33. )
  34. func TestPreprocessor_Apply(t *testing.T) {
  35. var tests = []struct {
  36. stmt *ast.StreamStmt
  37. data []byte
  38. result interface{}
  39. }{
  40. //Basic type
  41. { // 0
  42. stmt: &ast.StreamStmt{
  43. Name: ast.StreamName("demo"),
  44. StreamFields: []ast.StreamField{
  45. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  46. },
  47. },
  48. data: []byte(`{"a": 6}`),
  49. result: errors.New("error in preprocessor: field abc is not found"),
  50. },
  51. { // 1
  52. stmt: &ast.StreamStmt{
  53. Name: ast.StreamName("demo"),
  54. StreamFields: []ast.StreamField{
  55. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  56. },
  57. },
  58. data: []byte(`{"abc": null}`),
  59. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert <nil>(<nil>) to int64"),
  60. },
  61. { // 2
  62. stmt: &ast.StreamStmt{
  63. Name: ast.StreamName("demo"),
  64. StreamFields: nil,
  65. },
  66. data: []byte(`{"a": 6}`),
  67. result: &xsql.Tuple{Message: xsql.Message{
  68. "a": float64(6),
  69. },
  70. },
  71. },
  72. { // 3
  73. stmt: &ast.StreamStmt{
  74. Name: ast.StreamName("demo"),
  75. StreamFields: []ast.StreamField{
  76. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  77. },
  78. },
  79. data: []byte(`{"abc": 6}`),
  80. result: &xsql.Tuple{Message: xsql.Message{
  81. "abc": int64(6),
  82. },
  83. },
  84. },
  85. { // 4
  86. stmt: &ast.StreamStmt{
  87. Name: ast.StreamName("demo"),
  88. StreamFields: nil,
  89. },
  90. data: []byte(`{"abc": 6}`),
  91. result: &xsql.Tuple{Message: xsql.Message{
  92. "abc": float64(6),
  93. },
  94. },
  95. },
  96. { // 5
  97. stmt: &ast.StreamStmt{
  98. Name: ast.StreamName("demo"),
  99. StreamFields: []ast.StreamField{
  100. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  101. {Name: "dEf", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  102. },
  103. },
  104. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  105. result: &xsql.Tuple{Message: xsql.Message{
  106. "abc": float64(34),
  107. "dEf": "hello",
  108. "def": "hello",
  109. "ghi": float64(50),
  110. },
  111. },
  112. },
  113. { // 6
  114. stmt: &ast.StreamStmt{
  115. Name: ast.StreamName("demo"),
  116. StreamFields: nil,
  117. },
  118. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  119. result: &xsql.Tuple{Message: xsql.Message{
  120. "abc": float64(34),
  121. "def": "hello",
  122. "ghi": float64(50),
  123. },
  124. },
  125. },
  126. { // 7
  127. stmt: &ast.StreamStmt{
  128. Name: ast.StreamName("demo"),
  129. StreamFields: []ast.StreamField{
  130. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  131. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  132. },
  133. },
  134. data: []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
  135. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert string(34) to float64"),
  136. },
  137. { // 8
  138. stmt: &ast.StreamStmt{
  139. Name: ast.StreamName("demo"),
  140. StreamFields: []ast.StreamField{
  141. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  142. {Name: "def", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  143. },
  144. },
  145. data: []byte(`{"abc": 77, "def" : "hello"}`),
  146. result: errors.New("error in preprocessor: field def type mismatch: cannot convert string(hello) to bool"),
  147. },
  148. { // 9
  149. stmt: &ast.StreamStmt{
  150. Name: ast.StreamName("demo"),
  151. StreamFields: []ast.StreamField{
  152. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  153. {Name: "def", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  154. },
  155. },
  156. data: []byte(`{"a": {"b" : "hello"}}`),
  157. result: errors.New("error in preprocessor: field abc is not found"),
  158. },
  159. { // 10
  160. stmt: &ast.StreamStmt{
  161. Name: ast.StreamName("demo"),
  162. StreamFields: nil,
  163. },
  164. data: []byte(`{"a": {"b" : "hello"}}`),
  165. result: &xsql.Tuple{Message: xsql.Message{
  166. "a": map[string]interface{}{
  167. "b": "hello",
  168. },
  169. },
  170. },
  171. },
  172. //Rec type
  173. { // 11
  174. stmt: &ast.StreamStmt{
  175. Name: ast.StreamName("demo"),
  176. StreamFields: []ast.StreamField{
  177. {Name: "a", FieldType: &ast.RecType{
  178. StreamFields: []ast.StreamField{
  179. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  180. },
  181. }},
  182. },
  183. },
  184. data: []byte(`{"a": {"b" : "hello"}}`),
  185. result: &xsql.Tuple{Message: xsql.Message{
  186. "a": map[string]interface{}{
  187. "b": "hello",
  188. },
  189. },
  190. },
  191. },
  192. { // 12
  193. stmt: &ast.StreamStmt{
  194. Name: ast.StreamName("demo"),
  195. StreamFields: []ast.StreamField{
  196. {Name: "a", FieldType: &ast.RecType{
  197. StreamFields: []ast.StreamField{
  198. {Name: "b", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  199. },
  200. }},
  201. },
  202. },
  203. data: []byte(`{"a": "{\"b\" : \"32\"}"}`),
  204. result: errors.New("error in preprocessor: field a type mismatch: field b type mismatch: cannot convert string(32) to float64"),
  205. },
  206. { // 13
  207. stmt: &ast.StreamStmt{
  208. Name: ast.StreamName("demo"),
  209. StreamFields: nil,
  210. },
  211. data: []byte(`{"a": {"b" : "32"}}`),
  212. result: &xsql.Tuple{Message: xsql.Message{
  213. "a": map[string]interface{}{
  214. "b": "32",
  215. },
  216. },
  217. },
  218. },
  219. //Array of complex type
  220. { // 14
  221. stmt: &ast.StreamStmt{
  222. Name: ast.StreamName("demo"),
  223. StreamFields: []ast.StreamField{
  224. {Name: "a", FieldType: &ast.ArrayType{
  225. Type: ast.STRUCT,
  226. FieldType: &ast.RecType{
  227. StreamFields: []ast.StreamField{
  228. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  229. },
  230. },
  231. }},
  232. },
  233. },
  234. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  235. result: &xsql.Tuple{Message: xsql.Message{
  236. "a": []interface{}{
  237. map[string]interface{}{"b": "hello1"},
  238. map[string]interface{}{"b": "hello2"},
  239. },
  240. },
  241. },
  242. },
  243. { // 15
  244. stmt: &ast.StreamStmt{
  245. Name: ast.StreamName("demo"),
  246. StreamFields: []ast.StreamField{
  247. {Name: "a", FieldType: &ast.ArrayType{
  248. Type: ast.STRUCT,
  249. FieldType: &ast.RecType{
  250. StreamFields: []ast.StreamField{
  251. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  252. },
  253. },
  254. }},
  255. },
  256. },
  257. data: []byte(`{"a": []}`),
  258. result: &xsql.Tuple{Message: xsql.Message{
  259. "a": make([]interface{}, 0),
  260. },
  261. },
  262. },
  263. { // 16
  264. stmt: &ast.StreamStmt{
  265. Name: ast.StreamName("demo"),
  266. StreamFields: []ast.StreamField{
  267. {Name: "a", FieldType: &ast.ArrayType{
  268. Type: ast.STRUCT,
  269. FieldType: &ast.RecType{
  270. StreamFields: []ast.StreamField{
  271. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  272. },
  273. },
  274. }},
  275. },
  276. },
  277. data: []byte(`{"a": null}`),
  278. result: &xsql.Tuple{Message: xsql.Message{
  279. "a": []interface{}(nil),
  280. },
  281. },
  282. },
  283. { // 17
  284. stmt: &ast.StreamStmt{
  285. Name: ast.StreamName("demo"),
  286. StreamFields: []ast.StreamField{
  287. {Name: "a", FieldType: &ast.ArrayType{
  288. Type: ast.STRUCT,
  289. FieldType: &ast.RecType{
  290. StreamFields: []ast.StreamField{
  291. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  292. },
  293. },
  294. }},
  295. },
  296. },
  297. data: []byte(`{"a": [null, {"b" : "hello2"}]}`),
  298. result: &xsql.Tuple{Message: xsql.Message{
  299. "a": []interface{}{
  300. map[string]interface{}(nil),
  301. map[string]interface{}{"b": "hello2"},
  302. },
  303. },
  304. },
  305. },
  306. { // 18
  307. stmt: &ast.StreamStmt{
  308. Name: ast.StreamName("demo"),
  309. StreamFields: []ast.StreamField{
  310. {Name: "a", FieldType: &ast.ArrayType{
  311. Type: ast.ARRAY,
  312. FieldType: &ast.ArrayType{
  313. Type: ast.BIGINT,
  314. },
  315. }},
  316. },
  317. },
  318. data: []byte(`{"a": [[50, 60, 70],[66], [77]]}`),
  319. result: &xsql.Tuple{Message: xsql.Message{
  320. "a": []interface{}{
  321. []interface{}{int64(50), int64(60), int64(70)},
  322. []interface{}{int64(66)},
  323. []interface{}{int64(77)},
  324. },
  325. },
  326. },
  327. },
  328. { // 19
  329. stmt: &ast.StreamStmt{
  330. Name: ast.StreamName("demo"),
  331. StreamFields: []ast.StreamField{
  332. {Name: "a", FieldType: &ast.ArrayType{
  333. Type: ast.ARRAY,
  334. FieldType: &ast.ArrayType{
  335. Type: ast.BIGINT,
  336. },
  337. }},
  338. },
  339. },
  340. data: []byte(`{"a": [null, [66], [77]]}`),
  341. result: &xsql.Tuple{Message: xsql.Message{
  342. "a": []interface{}{
  343. []interface{}(nil),
  344. []interface{}{int64(66)},
  345. []interface{}{int64(77)},
  346. },
  347. },
  348. },
  349. },
  350. { // 20
  351. stmt: &ast.StreamStmt{
  352. Name: ast.StreamName("demo"),
  353. StreamFields: nil,
  354. },
  355. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  356. result: &xsql.Tuple{Message: xsql.Message{
  357. "a": []interface{}{
  358. map[string]interface{}{"b": "hello1"},
  359. map[string]interface{}{"b": "hello2"},
  360. },
  361. },
  362. },
  363. },
  364. { // 21
  365. stmt: &ast.StreamStmt{
  366. Name: ast.StreamName("demo"),
  367. StreamFields: []ast.StreamField{
  368. {Name: "a", FieldType: &ast.ArrayType{
  369. Type: ast.FLOAT,
  370. }},
  371. },
  372. },
  373. data: []byte(`{"a": "[\"55\", \"77\"]"}`),
  374. result: errors.New("error in preprocessor: field a type mismatch: expect array but got [\"55\", \"77\"]"),
  375. },
  376. { // 22
  377. stmt: &ast.StreamStmt{
  378. Name: ast.StreamName("demo"),
  379. StreamFields: nil,
  380. },
  381. data: []byte(`{"a": [55, 77]}`),
  382. result: &xsql.Tuple{Message: xsql.Message{
  383. "a": []interface{}{
  384. float64(55),
  385. float64(77),
  386. },
  387. },
  388. },
  389. },
  390. //Rec of complex type
  391. { // 23
  392. stmt: &ast.StreamStmt{
  393. Name: ast.StreamName("demo"),
  394. StreamFields: []ast.StreamField{
  395. {Name: "a", FieldType: &ast.RecType{
  396. StreamFields: []ast.StreamField{
  397. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  398. {Name: "c", FieldType: &ast.RecType{
  399. StreamFields: []ast.StreamField{
  400. {Name: "d", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  401. },
  402. }},
  403. },
  404. }},
  405. },
  406. },
  407. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  408. result: &xsql.Tuple{Message: xsql.Message{
  409. "a": map[string]interface{}{
  410. "b": "hello",
  411. "c": map[string]interface{}{
  412. "d": int64(35),
  413. },
  414. },
  415. },
  416. },
  417. },
  418. { // 24
  419. stmt: &ast.StreamStmt{
  420. Name: ast.StreamName("demo"),
  421. StreamFields: []ast.StreamField{
  422. {Name: "a", FieldType: &ast.RecType{
  423. StreamFields: []ast.StreamField{
  424. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  425. {Name: "c", FieldType: &ast.RecType{
  426. StreamFields: []ast.StreamField{
  427. {Name: "d", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  428. },
  429. }},
  430. },
  431. }},
  432. },
  433. },
  434. data: []byte(`{"a": null}`),
  435. result: &xsql.Tuple{Message: xsql.Message{
  436. "a": map[string]interface{}(nil),
  437. },
  438. },
  439. },
  440. { // 25
  441. stmt: &ast.StreamStmt{
  442. Name: ast.StreamName("demo"),
  443. StreamFields: []ast.StreamField{
  444. {Name: "a", FieldType: &ast.RecType{
  445. StreamFields: []ast.StreamField{
  446. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  447. {Name: "c", FieldType: &ast.ArrayType{
  448. Type: ast.FLOAT,
  449. }},
  450. },
  451. }},
  452. },
  453. },
  454. data: []byte(`{"a": {"b" : "hello", "c": [35.2, 38.2]}}`),
  455. result: &xsql.Tuple{Message: xsql.Message{
  456. "a": map[string]interface{}{
  457. "b": "hello",
  458. "c": []interface{}{
  459. 35.2, 38.2,
  460. },
  461. },
  462. },
  463. },
  464. },
  465. { // 26
  466. stmt: &ast.StreamStmt{
  467. Name: ast.StreamName("demo"),
  468. StreamFields: []ast.StreamField{
  469. {Name: "a", FieldType: &ast.RecType{
  470. StreamFields: []ast.StreamField{
  471. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  472. {Name: "c", FieldType: &ast.ArrayType{
  473. Type: ast.FLOAT,
  474. }},
  475. },
  476. }},
  477. },
  478. },
  479. data: []byte(`{"a": {"b" : "hello", "c": null}}`),
  480. result: &xsql.Tuple{Message: xsql.Message{
  481. "a": map[string]interface{}{
  482. "b": "hello",
  483. "c": []interface{}(nil),
  484. },
  485. },
  486. },
  487. },
  488. { // 27
  489. stmt: &ast.StreamStmt{
  490. Name: ast.StreamName("demo"),
  491. StreamFields: []ast.StreamField{
  492. {Name: "a", FieldType: &ast.RecType{
  493. StreamFields: []ast.StreamField{
  494. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  495. {Name: "c", FieldType: &ast.ArrayType{
  496. Type: ast.FLOAT,
  497. }},
  498. },
  499. }},
  500. },
  501. },
  502. data: []byte(`{"a": {"b" : "hello", "c": [null, 35.4]}}`),
  503. result: errors.New("error in preprocessor: field a type mismatch: field c type mismatch: array element type mismatch: cannot convert <nil>(<nil>) to float64"),
  504. },
  505. { // 28
  506. stmt: &ast.StreamStmt{
  507. Name: ast.StreamName("demo"),
  508. StreamFields: nil,
  509. },
  510. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  511. result: &xsql.Tuple{Message: xsql.Message{
  512. "a": map[string]interface{}{
  513. "b": "hello",
  514. "c": map[string]interface{}{
  515. "d": 35.2,
  516. },
  517. },
  518. },
  519. },
  520. },
  521. }
  522. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  523. defer conf.CloseLogger()
  524. contextLogger := conf.Log.WithField("rule", "TestPreprocessor_Apply")
  525. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  526. for i, tt := range tests {
  527. pp := &Preprocessor{checkSchema: true}
  528. pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
  529. dm := make(map[string]interface{})
  530. if e := json.Unmarshal(tt.data, &dm); e != nil {
  531. log.Fatal(e)
  532. return
  533. } else {
  534. tuple := &xsql.Tuple{Message: dm}
  535. fv, afv := xsql.NewFunctionValuersForOp(nil)
  536. result := pp.Apply(ctx, tuple, fv, afv)
  537. if !reflect.DeepEqual(tt.result, result) {
  538. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  539. }
  540. }
  541. }
  542. }
  543. func TestPreprocessorTime_Apply(t *testing.T) {
  544. var tests = []struct {
  545. stmt *ast.StreamStmt
  546. data []byte
  547. result interface{}
  548. }{
  549. { // 0
  550. stmt: &ast.StreamStmt{
  551. Name: ast.StreamName("demo"),
  552. StreamFields: []ast.StreamField{
  553. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  554. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  555. },
  556. },
  557. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  558. result: &xsql.Tuple{Message: xsql.Message{
  559. "abc": cast.TimeFromUnixMilli(1568854515000),
  560. "def": cast.TimeFromUnixMilli(1568854573431),
  561. },
  562. },
  563. },
  564. { // 1
  565. stmt: &ast.StreamStmt{
  566. Name: ast.StreamName("demo"),
  567. StreamFields: nil,
  568. },
  569. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  570. result: &xsql.Tuple{Message: xsql.Message{
  571. "abc": "2019-09-19T00:55:15.000Z",
  572. "def": float64(1568854573431),
  573. },
  574. },
  575. },
  576. { // 2
  577. stmt: &ast.StreamStmt{
  578. Name: ast.StreamName("demo"),
  579. StreamFields: []ast.StreamField{
  580. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  581. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  582. },
  583. },
  584. data: []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
  585. result: errors.New("error in preprocessor: field abc type mismatch: parsing time \"2019-09-19T00:55:1dd5Z\" as \"2006-01-02T15:04:05.000Z07:00\": cannot parse \"1dd5Z\" as \"05\""),
  586. },
  587. { // 3
  588. stmt: &ast.StreamStmt{
  589. Name: ast.StreamName("demo"),
  590. StreamFields: []ast.StreamField{
  591. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  592. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  593. },
  594. Options: &ast.Options{
  595. DATASOURCE: "users",
  596. FORMAT: "JSON",
  597. KEY: "USERID",
  598. CONF_KEY: "srv1",
  599. TYPE: "MQTT",
  600. TIMESTAMP: "USERID",
  601. TIMESTAMP_FORMAT: "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
  602. },
  603. },
  604. data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
  605. result: &xsql.Tuple{Message: xsql.Message{
  606. "abc": cast.TimeFromUnixMilli(1568894115000),
  607. "def": cast.TimeFromUnixMilli(1568854573431),
  608. }},
  609. },
  610. //Array type
  611. { // 4
  612. stmt: &ast.StreamStmt{
  613. Name: ast.StreamName("demo"),
  614. StreamFields: []ast.StreamField{
  615. {Name: "a", FieldType: &ast.ArrayType{
  616. Type: ast.DATETIME,
  617. }},
  618. },
  619. },
  620. data: []byte(`{"a": [1568854515123, 1568854573431]}`),
  621. result: &xsql.Tuple{Message: xsql.Message{
  622. "a": []interface{}{
  623. cast.TimeFromUnixMilli(1568854515123),
  624. cast.TimeFromUnixMilli(1568854573431),
  625. },
  626. },
  627. },
  628. },
  629. { // 5
  630. stmt: &ast.StreamStmt{
  631. Name: ast.StreamName("demo"),
  632. StreamFields: []ast.StreamField{
  633. {Name: "a", FieldType: &ast.RecType{
  634. StreamFields: []ast.StreamField{
  635. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  636. {Name: "c", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  637. },
  638. }},
  639. },
  640. },
  641. data: []byte(`{"a": {"b" : "hello", "c": 1568854515000}}`),
  642. result: &xsql.Tuple{Message: xsql.Message{
  643. "a": map[string]interface{}{
  644. "b": "hello",
  645. "c": cast.TimeFromUnixMilli(1568854515000),
  646. },
  647. },
  648. },
  649. },
  650. }
  651. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  652. defer conf.CloseLogger()
  653. contextLogger := conf.Log.WithField("rule", "TestPreprocessorTime_Apply")
  654. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  655. for i, tt := range tests {
  656. pp := &Preprocessor{checkSchema: true}
  657. pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
  658. if tt.stmt.Options != nil {
  659. pp.timestampFormat = tt.stmt.Options.TIMESTAMP_FORMAT
  660. }
  661. dm := make(map[string]interface{})
  662. if e := json.Unmarshal(tt.data, &dm); e != nil {
  663. log.Fatal(e)
  664. return
  665. } else {
  666. tuple := &xsql.Tuple{Message: dm}
  667. fv, afv := xsql.NewFunctionValuersForOp(nil)
  668. result := pp.Apply(ctx, tuple, fv, afv)
  669. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  670. if rt, ok := result.(*xsql.Tuple); ok {
  671. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  672. rt.Message["abc"] = rtt.UTC()
  673. }
  674. }
  675. if !reflect.DeepEqual(tt.result, result) {
  676. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  677. }
  678. }
  679. }
  680. }
  681. func convertFields(o ast.StreamFields) []interface{} {
  682. if o == nil {
  683. return nil
  684. }
  685. fields := make([]interface{}, len(o))
  686. for i := range o {
  687. fields[i] = &o[i]
  688. }
  689. return fields
  690. }
  691. func TestPreprocessorEventtime_Apply(t *testing.T) {
  692. var tests = []struct {
  693. stmt *ast.StreamStmt
  694. data []byte
  695. result interface{}
  696. }{
  697. //Basic type
  698. { // 0
  699. stmt: &ast.StreamStmt{
  700. Name: ast.StreamName("demo"),
  701. StreamFields: []ast.StreamField{
  702. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  703. },
  704. Options: &ast.Options{
  705. DATASOURCE: "users",
  706. FORMAT: "JSON",
  707. KEY: "USERID",
  708. CONF_KEY: "srv1",
  709. TYPE: "MQTT",
  710. TIMESTAMP: "abc",
  711. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  712. },
  713. },
  714. data: []byte(`{"abc": 1568854515000}`),
  715. result: &xsql.Tuple{Message: xsql.Message{
  716. "abc": int64(1568854515000),
  717. }, Timestamp: 1568854515000,
  718. },
  719. },
  720. { // 1
  721. stmt: &ast.StreamStmt{
  722. Name: ast.StreamName("demo"),
  723. StreamFields: nil,
  724. Options: &ast.Options{
  725. DATASOURCE: "users",
  726. FORMAT: "JSON",
  727. KEY: "USERID",
  728. CONF_KEY: "srv1",
  729. TYPE: "MQTT",
  730. TIMESTAMP: "abc",
  731. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  732. },
  733. },
  734. data: []byte(`{"abc": 1568854515000}`),
  735. result: &xsql.Tuple{Message: xsql.Message{
  736. "abc": float64(1568854515000),
  737. }, Timestamp: 1568854515000,
  738. },
  739. },
  740. { // 2
  741. stmt: &ast.StreamStmt{
  742. Name: ast.StreamName("demo"),
  743. StreamFields: []ast.StreamField{
  744. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  745. },
  746. Options: &ast.Options{
  747. DATASOURCE: "users",
  748. TIMESTAMP: "abc",
  749. },
  750. },
  751. data: []byte(`{"abc": true}`),
  752. result: errors.New("cannot convert timestamp field abc to timestamp with error unsupported type to convert to timestamp true"),
  753. },
  754. { // 3
  755. stmt: &ast.StreamStmt{
  756. Name: ast.StreamName("demo"),
  757. StreamFields: []ast.StreamField{
  758. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  759. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  760. },
  761. Options: &ast.Options{
  762. DATASOURCE: "users",
  763. TIMESTAMP: "def",
  764. },
  765. },
  766. data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
  767. result: &xsql.Tuple{Message: xsql.Message{
  768. "abc": float64(34),
  769. "def": "2019-09-23T02:47:29.754Z",
  770. "ghi": float64(50),
  771. }, Timestamp: int64(1569206849754),
  772. },
  773. },
  774. { // 4
  775. stmt: &ast.StreamStmt{
  776. Name: ast.StreamName("demo"),
  777. StreamFields: []ast.StreamField{
  778. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  779. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  780. },
  781. Options: &ast.Options{
  782. DATASOURCE: "users",
  783. TIMESTAMP: "abc",
  784. },
  785. },
  786. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  787. result: &xsql.Tuple{Message: xsql.Message{
  788. "abc": cast.TimeFromUnixMilli(1568854515000),
  789. "def": cast.TimeFromUnixMilli(1568854573431),
  790. }, Timestamp: int64(1568854515000),
  791. },
  792. },
  793. { // 5
  794. stmt: &ast.StreamStmt{
  795. Name: ast.StreamName("demo"),
  796. StreamFields: []ast.StreamField{
  797. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  798. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  799. },
  800. Options: &ast.Options{
  801. DATASOURCE: "users",
  802. TIMESTAMP: "def",
  803. TIMESTAMP_FORMAT: "yyyy-MM-dd'AT'HH:mm:ss",
  804. },
  805. },
  806. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  807. result: &xsql.Tuple{Message: xsql.Message{
  808. "abc": float64(34),
  809. "def": "2019-09-23AT02:47:29",
  810. "ghi": float64(50),
  811. }, Timestamp: int64(1569206849000),
  812. },
  813. },
  814. { // 6
  815. stmt: &ast.StreamStmt{
  816. Name: ast.StreamName("demo"),
  817. StreamFields: []ast.StreamField{
  818. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  819. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  820. },
  821. Options: &ast.Options{
  822. DATASOURCE: "users",
  823. TIMESTAMP: "def",
  824. TIMESTAMP_FORMAT: "yyyy-MM-ddaHH:mm:ss",
  825. },
  826. },
  827. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  828. result: errors.New("cannot convert timestamp field def to timestamp with error parsing time \"2019-09-23AT02:47:29\" as \"2006-01-02PM15:04:05\": cannot parse \"02:47:29\" as \"PM\""),
  829. },
  830. }
  831. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  832. defer conf.CloseLogger()
  833. contextLogger := conf.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
  834. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  835. for i, tt := range tests {
  836. pp := &Preprocessor{
  837. checkSchema: true,
  838. defaultFieldProcessor: defaultFieldProcessor{
  839. streamFields: tt.stmt.StreamFields.ToJsonSchema(),
  840. timestampFormat: tt.stmt.Options.TIMESTAMP_FORMAT,
  841. },
  842. isEventTime: true,
  843. timestampField: tt.stmt.Options.TIMESTAMP,
  844. }
  845. dm := make(map[string]interface{})
  846. if e := json.Unmarshal(tt.data, &dm); e != nil {
  847. log.Fatal(e)
  848. return
  849. } else {
  850. tuple := &xsql.Tuple{Message: dm}
  851. fv, afv := xsql.NewFunctionValuersForOp(nil)
  852. result := pp.Apply(ctx, tuple, fv, afv)
  853. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  854. if rt, ok := result.(*xsql.Tuple); ok {
  855. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  856. rt.Message["abc"] = rtt.UTC()
  857. }
  858. }
  859. if !reflect.DeepEqual(tt.result, result) {
  860. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  861. }
  862. }
  863. }
  864. }
  865. func TestPreprocessorError(t *testing.T) {
  866. tests := []struct {
  867. stmt *ast.StreamStmt
  868. data []byte
  869. result interface{}
  870. }{
  871. {
  872. stmt: &ast.StreamStmt{
  873. Name: ast.StreamName("demo"),
  874. StreamFields: []ast.StreamField{
  875. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  876. },
  877. },
  878. data: []byte(`{"abc": "dafsad"}`),
  879. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert string(dafsad) to int64"),
  880. }, {
  881. stmt: &ast.StreamStmt{
  882. Name: ast.StreamName("demo"),
  883. StreamFields: []ast.StreamField{
  884. {Name: "a", FieldType: &ast.RecType{
  885. StreamFields: []ast.StreamField{
  886. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  887. },
  888. }},
  889. },
  890. },
  891. data: []byte(`{"a": {"d" : "hello"}}`),
  892. result: errors.New("error in preprocessor: field a type mismatch: field b is not found"),
  893. }, {
  894. stmt: &ast.StreamStmt{
  895. Name: ast.StreamName("demo"),
  896. StreamFields: []ast.StreamField{
  897. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  898. },
  899. Options: &ast.Options{
  900. DATASOURCE: "users",
  901. FORMAT: "JSON",
  902. KEY: "USERID",
  903. CONF_KEY: "srv1",
  904. TYPE: "MQTT",
  905. TIMESTAMP: "abc",
  906. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  907. },
  908. },
  909. data: []byte(`{"abc": "not a time"}`),
  910. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert string(not a time) to int64"),
  911. },
  912. }
  913. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  914. defer conf.CloseLogger()
  915. contextLogger := conf.Log.WithField("rule", "TestPreprocessorError")
  916. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  917. for i, tt := range tests {
  918. pp := &Preprocessor{checkSchema: true}
  919. pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
  920. dm := make(map[string]interface{})
  921. if e := json.Unmarshal(tt.data, &dm); e != nil {
  922. log.Fatal(e)
  923. return
  924. } else {
  925. tuple := &xsql.Tuple{Message: dm}
  926. fv, afv := xsql.NewFunctionValuersForOp(nil)
  927. result := pp.Apply(ctx, tuple, fv, afv)
  928. if !reflect.DeepEqual(tt.result, result) {
  929. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  930. }
  931. }
  932. }
  933. }
  934. func TestPreprocessorForBinary(t *testing.T) {
  935. docsFolder, err := conf.GetLoc("docs/")
  936. if err != nil {
  937. t.Errorf("Cannot find docs folder: %v", err)
  938. }
  939. image, err := os.ReadFile(path.Join(docsFolder, "cover.jpg"))
  940. if err != nil {
  941. t.Errorf("Cannot read image: %v", err)
  942. }
  943. b64img := base64.StdEncoding.EncodeToString(image)
  944. var tests = []struct {
  945. stmt *ast.StreamStmt
  946. data []byte
  947. result interface{}
  948. }{
  949. { // 0
  950. stmt: &ast.StreamStmt{
  951. Name: ast.StreamName("demo"),
  952. StreamFields: []ast.StreamField{
  953. {Name: "a", FieldType: &ast.RecType{
  954. StreamFields: []ast.StreamField{
  955. {Name: "b", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  956. },
  957. }},
  958. },
  959. },
  960. data: []byte(fmt.Sprintf(`{"a": {"b" : "%s"}}`, b64img)),
  961. result: &xsql.Tuple{Message: xsql.Message{
  962. "a": map[string]interface{}{
  963. "b": image,
  964. },
  965. },
  966. },
  967. },
  968. { // 1
  969. stmt: &ast.StreamStmt{
  970. Name: ast.StreamName("demo"),
  971. StreamFields: []ast.StreamField{
  972. {Name: "a", FieldType: &ast.ArrayType{
  973. Type: ast.BYTEA,
  974. }},
  975. },
  976. },
  977. data: []byte(fmt.Sprintf(`{"a": ["%s"]}`, b64img)),
  978. result: &xsql.Tuple{Message: xsql.Message{
  979. "a": []interface{}{
  980. image,
  981. },
  982. },
  983. },
  984. },
  985. {
  986. stmt: &ast.StreamStmt{
  987. Name: ast.StreamName("demo"),
  988. StreamFields: []ast.StreamField{
  989. {Name: "a", FieldType: &ast.ArrayType{
  990. Type: ast.STRUCT,
  991. FieldType: &ast.RecType{
  992. StreamFields: []ast.StreamField{
  993. {Name: "b", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  994. },
  995. },
  996. }},
  997. },
  998. },
  999. data: []byte(fmt.Sprintf(`{"a": [{"b":"%s"}]}`, b64img)),
  1000. result: &xsql.Tuple{Message: xsql.Message{
  1001. "a": []interface{}{
  1002. map[string]interface{}{"b": image},
  1003. },
  1004. },
  1005. },
  1006. },
  1007. }
  1008. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1009. defer conf.CloseLogger()
  1010. contextLogger := conf.Log.WithField("rule", "TestPreprocessorForBinary")
  1011. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1012. for i, tt := range tests {
  1013. pp := &Preprocessor{checkSchema: true}
  1014. pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
  1015. format := message.FormatJson
  1016. ccc, _ := converter.GetOrCreateConverter(format, "", "")
  1017. nCtx := context.WithValue(ctx, context.DecodeKey, ccc)
  1018. if dm, e := nCtx.Decode(tt.data); e != nil {
  1019. log.Fatal(e)
  1020. return
  1021. } else {
  1022. tuple := &xsql.Tuple{Message: dm}
  1023. fv, afv := xsql.NewFunctionValuersForOp(nil)
  1024. result := pp.Apply(ctx, tuple, fv, afv)
  1025. if !reflect.DeepEqual(tt.result, result) {
  1026. t.Errorf("%d. %q\n\nresult mismatch", i, tuple)
  1027. }
  1028. }
  1029. }
  1030. }