preprocessor_test.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924
  1. package plans
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/contexts"
  9. "log"
  10. "reflect"
  11. "testing"
  12. "time"
  13. )
  14. func TestPreprocessor_Apply(t *testing.T) {
  15. var tests = []struct {
  16. stmt *xsql.StreamStmt
  17. data []byte
  18. result interface{}
  19. }{
  20. //Basic type
  21. {
  22. stmt: &xsql.StreamStmt{
  23. Name: xsql.StreamName("demo"),
  24. StreamFields: []xsql.StreamField{
  25. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  26. },
  27. },
  28. data: []byte(`{"a": 6}`),
  29. result: errors.New("error in preprocessor: invalid data map[a:%!s(float64=6)], field abc not found"),
  30. },
  31. {
  32. stmt: &xsql.StreamStmt{
  33. Name: xsql.StreamName("demo"),
  34. StreamFields: []xsql.StreamField{
  35. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  36. },
  37. },
  38. data: []byte(`{"abc": null}`),
  39. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found <nil>(<nil>)"),
  40. },
  41. {
  42. stmt: &xsql.StreamStmt{
  43. Name: xsql.StreamName("demo"),
  44. StreamFields: nil,
  45. },
  46. data: []byte(`{"a": 6}`),
  47. result: &xsql.Tuple{Message: xsql.Message{
  48. "a": float64(6),
  49. },
  50. },
  51. },
  52. {
  53. stmt: &xsql.StreamStmt{
  54. Name: xsql.StreamName("demo"),
  55. StreamFields: []xsql.StreamField{
  56. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  57. },
  58. },
  59. data: []byte(`{"abc": 6}`),
  60. result: &xsql.Tuple{Message: xsql.Message{
  61. "abc": 6,
  62. },
  63. },
  64. },
  65. {
  66. stmt: &xsql.StreamStmt{
  67. Name: xsql.StreamName("demo"),
  68. StreamFields: nil,
  69. },
  70. data: []byte(`{"abc": 6}`),
  71. result: &xsql.Tuple{Message: xsql.Message{
  72. "abc": float64(6),
  73. },
  74. },
  75. },
  76. {
  77. stmt: &xsql.StreamStmt{
  78. Name: xsql.StreamName("demo"),
  79. StreamFields: []xsql.StreamField{
  80. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  81. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  82. },
  83. },
  84. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  85. result: &xsql.Tuple{Message: xsql.Message{
  86. "abc": float64(34),
  87. "def": "hello",
  88. },
  89. },
  90. },
  91. {
  92. stmt: &xsql.StreamStmt{
  93. Name: xsql.StreamName("demo"),
  94. StreamFields: nil,
  95. },
  96. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  97. result: &xsql.Tuple{Message: xsql.Message{
  98. "abc": float64(34),
  99. "def": "hello",
  100. "ghi": float64(50),
  101. },
  102. },
  103. },
  104. {
  105. stmt: &xsql.StreamStmt{
  106. Name: xsql.StreamName("demo"),
  107. StreamFields: []xsql.StreamField{
  108. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  109. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  110. },
  111. },
  112. data: []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
  113. result: &xsql.Tuple{Message: xsql.Message{
  114. "abc": float64(34),
  115. "def": "hello",
  116. },
  117. },
  118. },
  119. {
  120. stmt: &xsql.StreamStmt{
  121. Name: xsql.StreamName("demo"),
  122. StreamFields: []xsql.StreamField{
  123. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  124. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  125. },
  126. },
  127. data: []byte(`{"abc": 77, "def" : "hello"}`),
  128. result: errors.New("error in preprocessor: invalid data type for def, expect boolean but found string(hello)"),
  129. },
  130. {
  131. stmt: &xsql.StreamStmt{
  132. Name: xsql.StreamName("demo"),
  133. StreamFields: []xsql.StreamField{
  134. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  135. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  136. },
  137. },
  138. data: []byte(`{"a": {"b" : "hello"}}`),
  139. result: errors.New("error in preprocessor: invalid data map[a:map[b:hello]], field abc not found"),
  140. },
  141. {
  142. stmt: &xsql.StreamStmt{
  143. Name: xsql.StreamName("demo"),
  144. StreamFields: nil,
  145. },
  146. data: []byte(`{"a": {"b" : "hello"}}`),
  147. result: &xsql.Tuple{Message: xsql.Message{
  148. "a": map[string]interface{}{
  149. "b": "hello",
  150. },
  151. },
  152. },
  153. },
  154. //Rec type
  155. {
  156. stmt: &xsql.StreamStmt{
  157. Name: xsql.StreamName("demo"),
  158. StreamFields: []xsql.StreamField{
  159. {Name: "a", FieldType: &xsql.RecType{
  160. StreamFields: []xsql.StreamField{
  161. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  162. },
  163. }},
  164. },
  165. },
  166. data: []byte(`{"a": {"b" : "hello"}}`),
  167. result: &xsql.Tuple{Message: xsql.Message{
  168. "a": map[string]interface{}{
  169. "b": "hello",
  170. },
  171. },
  172. },
  173. },
  174. {
  175. stmt: &xsql.StreamStmt{
  176. Name: xsql.StreamName("demo"),
  177. StreamFields: []xsql.StreamField{
  178. {Name: "a", FieldType: &xsql.RecType{
  179. StreamFields: []xsql.StreamField{
  180. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  181. },
  182. }},
  183. },
  184. },
  185. data: []byte(`{"a": "{\"b\" : \"32\"}"}`),
  186. result: &xsql.Tuple{Message: xsql.Message{
  187. "a": map[string]interface{}{
  188. "b": float64(32),
  189. },
  190. },
  191. },
  192. },
  193. {
  194. stmt: &xsql.StreamStmt{
  195. Name: xsql.StreamName("demo"),
  196. StreamFields: nil,
  197. },
  198. data: []byte(`{"a": {"b" : "32"}}`),
  199. result: &xsql.Tuple{Message: xsql.Message{
  200. "a": map[string]interface{}{
  201. "b": "32",
  202. },
  203. },
  204. },
  205. },
  206. //Array of complex type
  207. {
  208. stmt: &xsql.StreamStmt{
  209. Name: xsql.StreamName("demo"),
  210. StreamFields: []xsql.StreamField{
  211. {Name: "a", FieldType: &xsql.ArrayType{
  212. Type: xsql.STRUCT,
  213. FieldType: &xsql.RecType{
  214. StreamFields: []xsql.StreamField{
  215. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  216. },
  217. },
  218. }},
  219. },
  220. },
  221. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  222. result: &xsql.Tuple{Message: xsql.Message{
  223. "a": []map[string]interface{}{
  224. {"b": "hello1"},
  225. {"b": "hello2"},
  226. },
  227. },
  228. },
  229. },
  230. {
  231. stmt: &xsql.StreamStmt{
  232. Name: xsql.StreamName("demo"),
  233. StreamFields: []xsql.StreamField{
  234. {Name: "a", FieldType: &xsql.ArrayType{
  235. Type: xsql.STRUCT,
  236. FieldType: &xsql.RecType{
  237. StreamFields: []xsql.StreamField{
  238. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  239. },
  240. },
  241. }},
  242. },
  243. },
  244. data: []byte(`{"a": []}`),
  245. result: &xsql.Tuple{Message: xsql.Message{
  246. "a": make([]map[string]interface{}, 0),
  247. },
  248. },
  249. },
  250. {
  251. stmt: &xsql.StreamStmt{
  252. Name: xsql.StreamName("demo"),
  253. StreamFields: []xsql.StreamField{
  254. {Name: "a", FieldType: &xsql.ArrayType{
  255. Type: xsql.STRUCT,
  256. FieldType: &xsql.RecType{
  257. StreamFields: []xsql.StreamField{
  258. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  259. },
  260. },
  261. }},
  262. },
  263. },
  264. data: []byte(`{"a": null}`),
  265. result: &xsql.Tuple{Message: xsql.Message{
  266. "a": []map[string]interface{}(nil),
  267. },
  268. },
  269. },
  270. {
  271. stmt: &xsql.StreamStmt{
  272. Name: xsql.StreamName("demo"),
  273. StreamFields: []xsql.StreamField{
  274. {Name: "a", FieldType: &xsql.ArrayType{
  275. Type: xsql.STRUCT,
  276. FieldType: &xsql.RecType{
  277. StreamFields: []xsql.StreamField{
  278. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  279. },
  280. },
  281. }},
  282. },
  283. },
  284. data: []byte(`{"a": [null, {"b" : "hello2"}]}`),
  285. result: &xsql.Tuple{Message: xsql.Message{
  286. "a": []map[string]interface{}{
  287. nil,
  288. {"b": "hello2"},
  289. },
  290. },
  291. },
  292. },
  293. {
  294. stmt: &xsql.StreamStmt{
  295. Name: xsql.StreamName("demo"),
  296. StreamFields: []xsql.StreamField{
  297. {Name: "a", FieldType: &xsql.ArrayType{
  298. Type: xsql.ARRAY,
  299. FieldType: &xsql.ArrayType{
  300. Type: xsql.BIGINT,
  301. },
  302. }},
  303. },
  304. },
  305. data: []byte(`{"a": [[50, 60, 70],[66], [77]]}`),
  306. result: &xsql.Tuple{Message: xsql.Message{
  307. "a": [][]int{
  308. {50, 60, 70},
  309. {66},
  310. {77},
  311. },
  312. },
  313. },
  314. },
  315. {
  316. stmt: &xsql.StreamStmt{
  317. Name: xsql.StreamName("demo"),
  318. StreamFields: []xsql.StreamField{
  319. {Name: "a", FieldType: &xsql.ArrayType{
  320. Type: xsql.ARRAY,
  321. FieldType: &xsql.ArrayType{
  322. Type: xsql.BIGINT,
  323. },
  324. }},
  325. },
  326. },
  327. data: []byte(`{"a": [null, [66], [77]]}`),
  328. result: &xsql.Tuple{Message: xsql.Message{
  329. "a": [][]int{
  330. []int(nil),
  331. {66},
  332. {77},
  333. },
  334. },
  335. },
  336. },
  337. {
  338. stmt: &xsql.StreamStmt{
  339. Name: xsql.StreamName("demo"),
  340. StreamFields: nil,
  341. },
  342. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  343. result: &xsql.Tuple{Message: xsql.Message{
  344. "a": []interface{}{
  345. map[string]interface{}{"b": "hello1"},
  346. map[string]interface{}{"b": "hello2"},
  347. },
  348. },
  349. },
  350. },
  351. {
  352. stmt: &xsql.StreamStmt{
  353. Name: xsql.StreamName("demo"),
  354. StreamFields: []xsql.StreamField{
  355. {Name: "a", FieldType: &xsql.ArrayType{
  356. Type: xsql.FLOAT,
  357. }},
  358. },
  359. },
  360. data: []byte(`{"a": "[\"55\", \"77\"]"}`),
  361. result: &xsql.Tuple{Message: xsql.Message{
  362. "a": []float64{
  363. 55,
  364. 77,
  365. },
  366. },
  367. },
  368. },
  369. {
  370. stmt: &xsql.StreamStmt{
  371. Name: xsql.StreamName("demo"),
  372. StreamFields: nil,
  373. },
  374. data: []byte(`{"a": [55, 77]}`),
  375. result: &xsql.Tuple{Message: xsql.Message{
  376. "a": []interface{}{
  377. float64(55),
  378. float64(77),
  379. },
  380. },
  381. },
  382. },
  383. //Rec of complex type
  384. {
  385. stmt: &xsql.StreamStmt{
  386. Name: xsql.StreamName("demo"),
  387. StreamFields: []xsql.StreamField{
  388. {Name: "a", FieldType: &xsql.RecType{
  389. StreamFields: []xsql.StreamField{
  390. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  391. {Name: "c", FieldType: &xsql.RecType{
  392. StreamFields: []xsql.StreamField{
  393. {Name: "d", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  394. },
  395. }},
  396. },
  397. }},
  398. },
  399. },
  400. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  401. result: &xsql.Tuple{Message: xsql.Message{
  402. "a": map[string]interface{}{
  403. "b": "hello",
  404. "c": map[string]interface{}{
  405. "d": int(35),
  406. },
  407. },
  408. },
  409. },
  410. },
  411. {
  412. stmt: &xsql.StreamStmt{
  413. Name: xsql.StreamName("demo"),
  414. StreamFields: []xsql.StreamField{
  415. {Name: "a", FieldType: &xsql.RecType{
  416. StreamFields: []xsql.StreamField{
  417. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  418. {Name: "c", FieldType: &xsql.RecType{
  419. StreamFields: []xsql.StreamField{
  420. {Name: "d", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  421. },
  422. }},
  423. },
  424. }},
  425. },
  426. },
  427. data: []byte(`{"a": null}`),
  428. result: &xsql.Tuple{Message: xsql.Message{
  429. "a": map[string]interface{}(nil),
  430. },
  431. },
  432. },
  433. {
  434. stmt: &xsql.StreamStmt{
  435. Name: xsql.StreamName("demo"),
  436. StreamFields: []xsql.StreamField{
  437. {Name: "a", FieldType: &xsql.RecType{
  438. StreamFields: []xsql.StreamField{
  439. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  440. {Name: "c", FieldType: &xsql.ArrayType{
  441. Type: xsql.FLOAT,
  442. }},
  443. },
  444. }},
  445. },
  446. },
  447. data: []byte(`{"a": {"b" : "hello", "c": [35.2, 38.2]}}`),
  448. result: &xsql.Tuple{Message: xsql.Message{
  449. "a": map[string]interface{}{
  450. "b": "hello",
  451. "c": []float64{
  452. 35.2, 38.2,
  453. },
  454. },
  455. },
  456. },
  457. },
  458. {
  459. stmt: &xsql.StreamStmt{
  460. Name: xsql.StreamName("demo"),
  461. StreamFields: []xsql.StreamField{
  462. {Name: "a", FieldType: &xsql.RecType{
  463. StreamFields: []xsql.StreamField{
  464. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  465. {Name: "c", FieldType: &xsql.ArrayType{
  466. Type: xsql.FLOAT,
  467. }},
  468. },
  469. }},
  470. },
  471. },
  472. data: []byte(`{"a": {"b" : "hello", "c": null}}`),
  473. result: &xsql.Tuple{Message: xsql.Message{
  474. "a": map[string]interface{}{
  475. "b": "hello",
  476. "c": []float64(nil),
  477. },
  478. },
  479. },
  480. },
  481. {
  482. stmt: &xsql.StreamStmt{
  483. Name: xsql.StreamName("demo"),
  484. StreamFields: []xsql.StreamField{
  485. {Name: "a", FieldType: &xsql.RecType{
  486. StreamFields: []xsql.StreamField{
  487. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  488. {Name: "c", FieldType: &xsql.ArrayType{
  489. Type: xsql.FLOAT,
  490. }},
  491. },
  492. }},
  493. },
  494. },
  495. data: []byte(`{"a": {"b" : "hello", "c": [null, 35.4]}}`),
  496. result: errors.New("error in preprocessor: fail to parse field c: invalid data type for [0], expect float but found <nil>(<nil>)"),
  497. },
  498. {
  499. stmt: &xsql.StreamStmt{
  500. Name: xsql.StreamName("demo"),
  501. StreamFields: nil,
  502. },
  503. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  504. result: &xsql.Tuple{Message: xsql.Message{
  505. "a": map[string]interface{}{
  506. "b": "hello",
  507. "c": map[string]interface{}{
  508. "d": 35.2,
  509. },
  510. },
  511. },
  512. },
  513. },
  514. }
  515. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  516. defer common.CloseLogger()
  517. contextLogger := common.Log.WithField("rule", "TestPreprocessor_Apply")
  518. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  519. for i, tt := range tests {
  520. pp := &Preprocessor{streamStmt: tt.stmt}
  521. dm := make(map[string]interface{})
  522. if e := json.Unmarshal(tt.data, &dm); e != nil {
  523. log.Fatal(e)
  524. return
  525. } else {
  526. tuple := &xsql.Tuple{Message: dm}
  527. result := pp.Apply(ctx, tuple)
  528. if !reflect.DeepEqual(tt.result, result) {
  529. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  530. }
  531. }
  532. }
  533. }
  534. func TestPreprocessorTime_Apply(t *testing.T) {
  535. var tests = []struct {
  536. stmt *xsql.StreamStmt
  537. data []byte
  538. result interface{}
  539. }{
  540. {
  541. stmt: &xsql.StreamStmt{
  542. Name: xsql.StreamName("demo"),
  543. StreamFields: []xsql.StreamField{
  544. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  545. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  546. },
  547. },
  548. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  549. result: &xsql.Tuple{Message: xsql.Message{
  550. "abc": common.TimeFromUnixMilli(1568854515000),
  551. "def": common.TimeFromUnixMilli(1568854573431),
  552. },
  553. },
  554. },
  555. {
  556. stmt: &xsql.StreamStmt{
  557. Name: xsql.StreamName("demo"),
  558. StreamFields: nil,
  559. },
  560. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  561. result: &xsql.Tuple{Message: xsql.Message{
  562. "abc": "2019-09-19T00:55:15.000Z",
  563. "def": float64(1568854573431),
  564. },
  565. },
  566. },
  567. {
  568. stmt: &xsql.StreamStmt{
  569. Name: xsql.StreamName("demo"),
  570. StreamFields: []xsql.StreamField{
  571. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  572. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  573. },
  574. },
  575. data: []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
  576. result: errors.New("error in preprocessor: invalid data type for abc, cannot convert to datetime: parsing time \"2019-09-19T00:55:1dd5Z\" as \"2006-01-02T15:04:05.000Z07:00\": cannot parse \"1dd5Z\" as \"05\""),
  577. },
  578. {
  579. stmt: &xsql.StreamStmt{
  580. Name: xsql.StreamName("demo"),
  581. StreamFields: []xsql.StreamField{
  582. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  583. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  584. },
  585. Options: map[string]string{
  586. "DATASOURCE": "users",
  587. "FORMAT": "AVRO",
  588. "KEY": "USERID",
  589. "CONF_KEY": "srv1",
  590. "TYPE": "MQTT",
  591. "TIMESTAMP": "USERID",
  592. "TIMESTAMP_FORMAT": "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
  593. },
  594. },
  595. data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
  596. result: &xsql.Tuple{Message: xsql.Message{
  597. "abc": common.TimeFromUnixMilli(1568894115000),
  598. "def": common.TimeFromUnixMilli(1568854573431),
  599. }},
  600. },
  601. //Array type
  602. {
  603. stmt: &xsql.StreamStmt{
  604. Name: xsql.StreamName("demo"),
  605. StreamFields: []xsql.StreamField{
  606. {Name: "a", FieldType: &xsql.ArrayType{
  607. Type: xsql.DATETIME,
  608. }},
  609. },
  610. },
  611. data: []byte(`{"a": [1568854515123, 1568854573431]}`),
  612. result: &xsql.Tuple{Message: xsql.Message{
  613. "a": []time.Time{
  614. common.TimeFromUnixMilli(1568854515123),
  615. common.TimeFromUnixMilli(1568854573431),
  616. },
  617. },
  618. },
  619. },
  620. {
  621. stmt: &xsql.StreamStmt{
  622. Name: xsql.StreamName("demo"),
  623. StreamFields: []xsql.StreamField{
  624. {Name: "a", FieldType: &xsql.RecType{
  625. StreamFields: []xsql.StreamField{
  626. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  627. {Name: "c", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  628. },
  629. }},
  630. },
  631. },
  632. data: []byte(`{"a": {"b" : "hello", "c": 1568854515000}}`),
  633. result: &xsql.Tuple{Message: xsql.Message{
  634. "a": map[string]interface{}{
  635. "b": "hello",
  636. "c": common.TimeFromUnixMilli(1568854515000),
  637. },
  638. },
  639. },
  640. },
  641. }
  642. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  643. defer common.CloseLogger()
  644. contextLogger := common.Log.WithField("rule", "TestPreprocessorTime_Apply")
  645. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  646. for i, tt := range tests {
  647. pp := &Preprocessor{streamStmt: tt.stmt}
  648. dm := make(map[string]interface{})
  649. if e := json.Unmarshal(tt.data, &dm); e != nil {
  650. log.Fatal(e)
  651. return
  652. } else {
  653. tuple := &xsql.Tuple{Message: dm}
  654. result := pp.Apply(ctx, tuple)
  655. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  656. if rt, ok := result.(*xsql.Tuple); ok {
  657. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  658. rt.Message["abc"] = rtt.UTC()
  659. }
  660. }
  661. if !reflect.DeepEqual(tt.result, result) {
  662. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  663. }
  664. }
  665. }
  666. }
  667. func TestPreprocessorEventtime_Apply(t *testing.T) {
  668. var tests = []struct {
  669. stmt *xsql.StreamStmt
  670. data []byte
  671. result interface{}
  672. }{
  673. //Basic type
  674. {
  675. stmt: &xsql.StreamStmt{
  676. Name: xsql.StreamName("demo"),
  677. StreamFields: []xsql.StreamField{
  678. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  679. },
  680. Options: map[string]string{
  681. "DATASOURCE": "users",
  682. "FORMAT": "AVRO",
  683. "KEY": "USERID",
  684. "CONF_KEY": "srv1",
  685. "TYPE": "MQTT",
  686. "TIMESTAMP": "abc",
  687. "TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
  688. },
  689. },
  690. data: []byte(`{"abc": 1568854515000}`),
  691. result: &xsql.Tuple{Message: xsql.Message{
  692. "abc": int(1568854515000),
  693. }, Timestamp: 1568854515000,
  694. },
  695. },
  696. {
  697. stmt: &xsql.StreamStmt{
  698. Name: xsql.StreamName("demo"),
  699. StreamFields: nil,
  700. Options: map[string]string{
  701. "DATASOURCE": "users",
  702. "FORMAT": "AVRO",
  703. "KEY": "USERID",
  704. "CONF_KEY": "srv1",
  705. "TYPE": "MQTT",
  706. "TIMESTAMP": "abc",
  707. "TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
  708. },
  709. },
  710. data: []byte(`{"abc": 1568854515000}`),
  711. result: &xsql.Tuple{Message: xsql.Message{
  712. "abc": float64(1568854515000),
  713. }, Timestamp: 1568854515000,
  714. },
  715. },
  716. {
  717. stmt: &xsql.StreamStmt{
  718. Name: xsql.StreamName("demo"),
  719. StreamFields: []xsql.StreamField{
  720. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  721. },
  722. Options: map[string]string{
  723. "DATASOURCE": "users",
  724. "TIMESTAMP": "abc",
  725. },
  726. },
  727. data: []byte(`{"abc": true}`),
  728. result: errors.New("cannot convert timestamp field abc to timestamp with error unsupported type to convert to timestamp true"),
  729. },
  730. {
  731. stmt: &xsql.StreamStmt{
  732. Name: xsql.StreamName("demo"),
  733. StreamFields: []xsql.StreamField{
  734. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  735. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  736. },
  737. Options: map[string]string{
  738. "DATASOURCE": "users",
  739. "TIMESTAMP": "def",
  740. },
  741. },
  742. data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
  743. result: &xsql.Tuple{Message: xsql.Message{
  744. "abc": float64(34),
  745. "def": "2019-09-23T02:47:29.754Z",
  746. }, Timestamp: int64(1569206849754),
  747. },
  748. },
  749. {
  750. stmt: &xsql.StreamStmt{
  751. Name: xsql.StreamName("demo"),
  752. StreamFields: []xsql.StreamField{
  753. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  754. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  755. },
  756. Options: map[string]string{
  757. "DATASOURCE": "users",
  758. "TIMESTAMP": "abc",
  759. },
  760. },
  761. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  762. result: &xsql.Tuple{Message: xsql.Message{
  763. "abc": common.TimeFromUnixMilli(1568854515000),
  764. "def": common.TimeFromUnixMilli(1568854573431),
  765. }, Timestamp: int64(1568854515000),
  766. },
  767. },
  768. {
  769. stmt: &xsql.StreamStmt{
  770. Name: xsql.StreamName("demo"),
  771. StreamFields: []xsql.StreamField{
  772. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  773. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  774. },
  775. Options: map[string]string{
  776. "DATASOURCE": "users",
  777. "TIMESTAMP": "def",
  778. "TIMESTAMP_FORMAT": "yyyy-MM-dd'AT'HH:mm:ss",
  779. },
  780. },
  781. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  782. result: &xsql.Tuple{Message: xsql.Message{
  783. "abc": float64(34),
  784. "def": "2019-09-23AT02:47:29",
  785. }, Timestamp: int64(1569206849000),
  786. },
  787. },
  788. {
  789. stmt: &xsql.StreamStmt{
  790. Name: xsql.StreamName("demo"),
  791. StreamFields: []xsql.StreamField{
  792. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  793. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  794. },
  795. Options: map[string]string{
  796. "DATASOURCE": "users",
  797. "TIMESTAMP": "def",
  798. "TIMESTAMP_FORMAT": "yyyy-MM-ddaHH:mm:ss",
  799. },
  800. },
  801. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  802. result: errors.New("cannot convert timestamp field def to timestamp with error parsing time \"2019-09-23AT02:47:29\" as \"2006-01-02PM15:04:05\": cannot parse \"02:47:29\" as \"PM\""),
  803. },
  804. }
  805. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  806. defer common.CloseLogger()
  807. contextLogger := common.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
  808. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  809. for i, tt := range tests {
  810. pp, err := NewPreprocessor(tt.stmt, nil, true)
  811. if err != nil {
  812. t.Error(err)
  813. }
  814. dm := make(map[string]interface{})
  815. if e := json.Unmarshal(tt.data, &dm); e != nil {
  816. log.Fatal(e)
  817. return
  818. } else {
  819. tuple := &xsql.Tuple{Message: dm}
  820. result := pp.Apply(ctx, tuple)
  821. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  822. if rt, ok := result.(*xsql.Tuple); ok {
  823. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  824. rt.Message["abc"] = rtt.UTC()
  825. }
  826. }
  827. if !reflect.DeepEqual(tt.result, result) {
  828. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  829. }
  830. }
  831. }
  832. }
  833. func TestPreprocessorError(t *testing.T) {
  834. tests := []struct {
  835. stmt *xsql.StreamStmt
  836. data []byte
  837. result interface{}
  838. }{
  839. {
  840. stmt: &xsql.StreamStmt{
  841. Name: xsql.StreamName("demo"),
  842. StreamFields: []xsql.StreamField{
  843. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  844. },
  845. },
  846. data: []byte(`{"abc": "dafsad"}`),
  847. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(dafsad)"),
  848. }, {
  849. stmt: &xsql.StreamStmt{
  850. Name: xsql.StreamName("demo"),
  851. StreamFields: []xsql.StreamField{
  852. {Name: "a", FieldType: &xsql.RecType{
  853. StreamFields: []xsql.StreamField{
  854. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  855. },
  856. }},
  857. },
  858. },
  859. data: []byte(`{"a": {"d" : "hello"}}`),
  860. result: errors.New("error in preprocessor: invalid data map[d:hello], field b not found"),
  861. }, {
  862. stmt: &xsql.StreamStmt{
  863. Name: xsql.StreamName("demo"),
  864. StreamFields: []xsql.StreamField{
  865. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  866. },
  867. Options: map[string]string{
  868. "DATASOURCE": "users",
  869. "FORMAT": "AVRO",
  870. "KEY": "USERID",
  871. "CONF_KEY": "srv1",
  872. "TYPE": "MQTT",
  873. "TIMESTAMP": "abc",
  874. "TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
  875. },
  876. },
  877. data: []byte(`{"abc": "not a time"}`),
  878. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(not a time)"),
  879. },
  880. }
  881. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  882. defer common.CloseLogger()
  883. contextLogger := common.Log.WithField("rule", "TestPreprocessorError")
  884. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  885. for i, tt := range tests {
  886. pp := &Preprocessor{streamStmt: tt.stmt}
  887. dm := make(map[string]interface{})
  888. if e := json.Unmarshal(tt.data, &dm); e != nil {
  889. log.Fatal(e)
  890. return
  891. } else {
  892. tuple := &xsql.Tuple{Message: dm}
  893. result := pp.Apply(ctx, tuple)
  894. if !reflect.DeepEqual(tt.result, result) {
  895. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  896. }
  897. }
  898. }
  899. }