preprocessor_test.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928
  1. package plans
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/contexts"
  9. "log"
  10. "reflect"
  11. "testing"
  12. "time"
  13. )
  14. func TestPreprocessor_Apply(t *testing.T) {
  15. var tests = []struct {
  16. stmt *xsql.StreamStmt
  17. data []byte
  18. result interface{}
  19. }{
  20. //Basic type
  21. {
  22. stmt: &xsql.StreamStmt{
  23. Name: xsql.StreamName("demo"),
  24. StreamFields: []xsql.StreamField{
  25. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  26. },
  27. },
  28. data: []byte(`{"a": 6}`),
  29. result: errors.New("error in preprocessor: invalid data map[a:%!s(float64=6)], field abc not found"),
  30. },
  31. {
  32. stmt: &xsql.StreamStmt{
  33. Name: xsql.StreamName("demo"),
  34. StreamFields: []xsql.StreamField{
  35. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  36. },
  37. },
  38. data: []byte(`{"abc": null}`),
  39. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found <nil>(<nil>)"),
  40. },
  41. {
  42. stmt: &xsql.StreamStmt{
  43. Name: xsql.StreamName("demo"),
  44. StreamFields: nil,
  45. },
  46. data: []byte(`{"a": 6}`),
  47. result: &xsql.Tuple{Message: xsql.Message{
  48. "a": float64(6),
  49. },
  50. },
  51. },
  52. {
  53. stmt: &xsql.StreamStmt{
  54. Name: xsql.StreamName("demo"),
  55. StreamFields: []xsql.StreamField{
  56. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  57. },
  58. },
  59. data: []byte(`{"abc": 6}`),
  60. result: &xsql.Tuple{Message: xsql.Message{
  61. "abc": 6,
  62. },
  63. },
  64. },
  65. {
  66. stmt: &xsql.StreamStmt{
  67. Name: xsql.StreamName("demo"),
  68. StreamFields: nil,
  69. },
  70. data: []byte(`{"abc": 6}`),
  71. result: &xsql.Tuple{Message: xsql.Message{
  72. "abc": float64(6),
  73. },
  74. },
  75. },
  76. {
  77. stmt: &xsql.StreamStmt{
  78. Name: xsql.StreamName("demo"),
  79. StreamFields: []xsql.StreamField{
  80. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  81. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  82. },
  83. },
  84. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  85. result: &xsql.Tuple{Message: xsql.Message{
  86. "abc": float64(34),
  87. "def": "hello",
  88. },
  89. },
  90. },
  91. {
  92. stmt: &xsql.StreamStmt{
  93. Name: xsql.StreamName("demo"),
  94. StreamFields: nil,
  95. },
  96. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  97. result: &xsql.Tuple{Message: xsql.Message{
  98. "abc": float64(34),
  99. "def": "hello",
  100. "ghi": float64(50),
  101. },
  102. },
  103. },
  104. {
  105. stmt: &xsql.StreamStmt{
  106. Name: xsql.StreamName("demo"),
  107. StreamFields: []xsql.StreamField{
  108. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  109. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  110. },
  111. },
  112. data: []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
  113. result: &xsql.Tuple{Message: xsql.Message{
  114. "abc": float64(34),
  115. "def": "hello",
  116. },
  117. },
  118. },
  119. {
  120. stmt: &xsql.StreamStmt{
  121. Name: xsql.StreamName("demo"),
  122. StreamFields: []xsql.StreamField{
  123. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  124. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  125. },
  126. },
  127. data: []byte(`{"abc": 77, "def" : "hello"}`),
  128. result: errors.New("error in preprocessor: invalid data type for def, expect boolean but found string(hello)"),
  129. },
  130. {
  131. stmt: &xsql.StreamStmt{
  132. Name: xsql.StreamName("demo"),
  133. StreamFields: []xsql.StreamField{
  134. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  135. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  136. },
  137. },
  138. data: []byte(`{"a": {"b" : "hello"}}`),
  139. result: errors.New("error in preprocessor: invalid data map[a:map[b:hello]], field abc not found"),
  140. },
  141. {
  142. stmt: &xsql.StreamStmt{
  143. Name: xsql.StreamName("demo"),
  144. StreamFields: nil,
  145. },
  146. data: []byte(`{"a": {"b" : "hello"}}`),
  147. result: &xsql.Tuple{Message: xsql.Message{
  148. "a": map[string]interface{}{
  149. "b": "hello",
  150. },
  151. },
  152. },
  153. },
  154. //Rec type
  155. {
  156. stmt: &xsql.StreamStmt{
  157. Name: xsql.StreamName("demo"),
  158. StreamFields: []xsql.StreamField{
  159. {Name: "a", FieldType: &xsql.RecType{
  160. StreamFields: []xsql.StreamField{
  161. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  162. },
  163. }},
  164. },
  165. },
  166. data: []byte(`{"a": {"b" : "hello"}}`),
  167. result: &xsql.Tuple{Message: xsql.Message{
  168. "a": map[string]interface{}{
  169. "b": "hello",
  170. },
  171. },
  172. },
  173. },
  174. {
  175. stmt: &xsql.StreamStmt{
  176. Name: xsql.StreamName("demo"),
  177. StreamFields: []xsql.StreamField{
  178. {Name: "a", FieldType: &xsql.RecType{
  179. StreamFields: []xsql.StreamField{
  180. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  181. },
  182. }},
  183. },
  184. },
  185. data: []byte(`{"a": "{\"b\" : \"32\"}"}`),
  186. result: &xsql.Tuple{Message: xsql.Message{
  187. "a": map[string]interface{}{
  188. "b": float64(32),
  189. },
  190. },
  191. },
  192. },
  193. {
  194. stmt: &xsql.StreamStmt{
  195. Name: xsql.StreamName("demo"),
  196. StreamFields: nil,
  197. },
  198. data: []byte(`{"a": {"b" : "32"}}`),
  199. result: &xsql.Tuple{Message: xsql.Message{
  200. "a": map[string]interface{}{
  201. "b": "32",
  202. },
  203. },
  204. },
  205. },
  206. //Array of complex type
  207. {
  208. stmt: &xsql.StreamStmt{
  209. Name: xsql.StreamName("demo"),
  210. StreamFields: []xsql.StreamField{
  211. {Name: "a", FieldType: &xsql.ArrayType{
  212. Type: xsql.STRUCT,
  213. FieldType: &xsql.RecType{
  214. StreamFields: []xsql.StreamField{
  215. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  216. },
  217. },
  218. }},
  219. },
  220. },
  221. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  222. result: &xsql.Tuple{Message: xsql.Message{
  223. "a": []map[string]interface{}{
  224. {"b": "hello1"},
  225. {"b": "hello2"},
  226. },
  227. },
  228. },
  229. },
  230. {
  231. stmt: &xsql.StreamStmt{
  232. Name: xsql.StreamName("demo"),
  233. StreamFields: []xsql.StreamField{
  234. {Name: "a", FieldType: &xsql.ArrayType{
  235. Type: xsql.STRUCT,
  236. FieldType: &xsql.RecType{
  237. StreamFields: []xsql.StreamField{
  238. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  239. },
  240. },
  241. }},
  242. },
  243. },
  244. data: []byte(`{"a": []}`),
  245. result: &xsql.Tuple{Message: xsql.Message{
  246. "a": make([]map[string]interface{}, 0),
  247. },
  248. },
  249. },
  250. {
  251. stmt: &xsql.StreamStmt{
  252. Name: xsql.StreamName("demo"),
  253. StreamFields: []xsql.StreamField{
  254. {Name: "a", FieldType: &xsql.ArrayType{
  255. Type: xsql.STRUCT,
  256. FieldType: &xsql.RecType{
  257. StreamFields: []xsql.StreamField{
  258. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  259. },
  260. },
  261. }},
  262. },
  263. },
  264. data: []byte(`{"a": null}`),
  265. result: &xsql.Tuple{Message: xsql.Message{
  266. "a": []map[string]interface{}(nil),
  267. },
  268. },
  269. },
  270. {
  271. stmt: &xsql.StreamStmt{
  272. Name: xsql.StreamName("demo"),
  273. StreamFields: []xsql.StreamField{
  274. {Name: "a", FieldType: &xsql.ArrayType{
  275. Type: xsql.STRUCT,
  276. FieldType: &xsql.RecType{
  277. StreamFields: []xsql.StreamField{
  278. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  279. },
  280. },
  281. }},
  282. },
  283. },
  284. data: []byte(`{"a": [null, {"b" : "hello2"}]}`),
  285. result: &xsql.Tuple{Message: xsql.Message{
  286. "a": []map[string]interface{}{
  287. nil,
  288. {"b": "hello2"},
  289. },
  290. },
  291. },
  292. },
  293. {
  294. stmt: &xsql.StreamStmt{
  295. Name: xsql.StreamName("demo"),
  296. StreamFields: []xsql.StreamField{
  297. {Name: "a", FieldType: &xsql.ArrayType{
  298. Type: xsql.ARRAY,
  299. FieldType: &xsql.ArrayType{
  300. Type: xsql.BIGINT,
  301. },
  302. }},
  303. },
  304. },
  305. data: []byte(`{"a": [[50, 60, 70],[66], [77]]}`),
  306. result: &xsql.Tuple{Message: xsql.Message{
  307. "a": [][]int{
  308. {50, 60, 70},
  309. {66},
  310. {77},
  311. },
  312. },
  313. },
  314. },
  315. {
  316. stmt: &xsql.StreamStmt{
  317. Name: xsql.StreamName("demo"),
  318. StreamFields: []xsql.StreamField{
  319. {Name: "a", FieldType: &xsql.ArrayType{
  320. Type: xsql.ARRAY,
  321. FieldType: &xsql.ArrayType{
  322. Type: xsql.BIGINT,
  323. },
  324. }},
  325. },
  326. },
  327. data: []byte(`{"a": [null, [66], [77]]}`),
  328. result: &xsql.Tuple{Message: xsql.Message{
  329. "a": [][]int{
  330. []int(nil),
  331. {66},
  332. {77},
  333. },
  334. },
  335. },
  336. },
  337. {
  338. stmt: &xsql.StreamStmt{
  339. Name: xsql.StreamName("demo"),
  340. StreamFields: nil,
  341. },
  342. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  343. result: &xsql.Tuple{Message: xsql.Message{
  344. "a": []interface{}{
  345. map[string]interface{}{"b": "hello1"},
  346. map[string]interface{}{"b": "hello2"},
  347. },
  348. },
  349. },
  350. },
  351. {
  352. stmt: &xsql.StreamStmt{
  353. Name: xsql.StreamName("demo"),
  354. StreamFields: []xsql.StreamField{
  355. {Name: "a", FieldType: &xsql.ArrayType{
  356. Type: xsql.FLOAT,
  357. }},
  358. },
  359. },
  360. data: []byte(`{"a": "[\"55\", \"77\"]"}`),
  361. result: &xsql.Tuple{Message: xsql.Message{
  362. "a": []float64{
  363. 55,
  364. 77,
  365. },
  366. },
  367. },
  368. },
  369. {
  370. stmt: &xsql.StreamStmt{
  371. Name: xsql.StreamName("demo"),
  372. StreamFields: nil,
  373. },
  374. data: []byte(`{"a": [55, 77]}`),
  375. result: &xsql.Tuple{Message: xsql.Message{
  376. "a": []interface{}{
  377. float64(55),
  378. float64(77),
  379. },
  380. },
  381. },
  382. },
  383. //Rec of complex type
  384. {
  385. stmt: &xsql.StreamStmt{
  386. Name: xsql.StreamName("demo"),
  387. StreamFields: []xsql.StreamField{
  388. {Name: "a", FieldType: &xsql.RecType{
  389. StreamFields: []xsql.StreamField{
  390. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  391. {Name: "c", FieldType: &xsql.RecType{
  392. StreamFields: []xsql.StreamField{
  393. {Name: "d", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  394. },
  395. }},
  396. },
  397. }},
  398. },
  399. },
  400. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  401. result: &xsql.Tuple{Message: xsql.Message{
  402. "a": map[string]interface{}{
  403. "b": "hello",
  404. "c": map[string]interface{}{
  405. "d": int(35),
  406. },
  407. },
  408. },
  409. },
  410. },
  411. {
  412. stmt: &xsql.StreamStmt{
  413. Name: xsql.StreamName("demo"),
  414. StreamFields: []xsql.StreamField{
  415. {Name: "a", FieldType: &xsql.RecType{
  416. StreamFields: []xsql.StreamField{
  417. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  418. {Name: "c", FieldType: &xsql.RecType{
  419. StreamFields: []xsql.StreamField{
  420. {Name: "d", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  421. },
  422. }},
  423. },
  424. }},
  425. },
  426. },
  427. data: []byte(`{"a": null}`),
  428. result: &xsql.Tuple{Message: xsql.Message{
  429. "a": map[string]interface{}(nil),
  430. },
  431. },
  432. },
  433. {
  434. stmt: &xsql.StreamStmt{
  435. Name: xsql.StreamName("demo"),
  436. StreamFields: []xsql.StreamField{
  437. {Name: "a", FieldType: &xsql.RecType{
  438. StreamFields: []xsql.StreamField{
  439. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  440. {Name: "c", FieldType: &xsql.ArrayType{
  441. Type: xsql.FLOAT,
  442. }},
  443. },
  444. }},
  445. },
  446. },
  447. data: []byte(`{"a": {"b" : "hello", "c": [35.2, 38.2]}}`),
  448. result: &xsql.Tuple{Message: xsql.Message{
  449. "a": map[string]interface{}{
  450. "b": "hello",
  451. "c": []float64{
  452. 35.2, 38.2,
  453. },
  454. },
  455. },
  456. },
  457. },
  458. {
  459. stmt: &xsql.StreamStmt{
  460. Name: xsql.StreamName("demo"),
  461. StreamFields: []xsql.StreamField{
  462. {Name: "a", FieldType: &xsql.RecType{
  463. StreamFields: []xsql.StreamField{
  464. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  465. {Name: "c", FieldType: &xsql.ArrayType{
  466. Type: xsql.FLOAT,
  467. }},
  468. },
  469. }},
  470. },
  471. },
  472. data: []byte(`{"a": {"b" : "hello", "c": null}}`),
  473. result: &xsql.Tuple{Message: xsql.Message{
  474. "a": map[string]interface{}{
  475. "b": "hello",
  476. "c": []float64(nil),
  477. },
  478. },
  479. },
  480. },
  481. {
  482. stmt: &xsql.StreamStmt{
  483. Name: xsql.StreamName("demo"),
  484. StreamFields: []xsql.StreamField{
  485. {Name: "a", FieldType: &xsql.RecType{
  486. StreamFields: []xsql.StreamField{
  487. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  488. {Name: "c", FieldType: &xsql.ArrayType{
  489. Type: xsql.FLOAT,
  490. }},
  491. },
  492. }},
  493. },
  494. },
  495. data: []byte(`{"a": {"b" : "hello", "c": [null, 35.4]}}`),
  496. result: errors.New("error in preprocessor: fail to parse field c: invalid data type for [0], expect float but found <nil>(<nil>)"),
  497. },
  498. {
  499. stmt: &xsql.StreamStmt{
  500. Name: xsql.StreamName("demo"),
  501. StreamFields: nil,
  502. },
  503. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  504. result: &xsql.Tuple{Message: xsql.Message{
  505. "a": map[string]interface{}{
  506. "b": "hello",
  507. "c": map[string]interface{}{
  508. "d": 35.2,
  509. },
  510. },
  511. },
  512. },
  513. },
  514. }
  515. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  516. defer common.CloseLogger()
  517. contextLogger := common.Log.WithField("rule", "TestPreprocessor_Apply")
  518. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  519. for i, tt := range tests {
  520. pp := &Preprocessor{streamStmt: tt.stmt}
  521. dm := make(map[string]interface{})
  522. if e := json.Unmarshal(tt.data, &dm); e != nil {
  523. log.Fatal(e)
  524. return
  525. } else {
  526. tuple := &xsql.Tuple{Message: dm}
  527. fv, afv := xsql.NewAggregateFunctionValuers()
  528. result := pp.Apply(ctx, tuple, fv, afv)
  529. if !reflect.DeepEqual(tt.result, result) {
  530. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  531. }
  532. }
  533. }
  534. }
  535. func TestPreprocessorTime_Apply(t *testing.T) {
  536. var tests = []struct {
  537. stmt *xsql.StreamStmt
  538. data []byte
  539. result interface{}
  540. }{
  541. {
  542. stmt: &xsql.StreamStmt{
  543. Name: xsql.StreamName("demo"),
  544. StreamFields: []xsql.StreamField{
  545. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  546. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  547. },
  548. },
  549. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  550. result: &xsql.Tuple{Message: xsql.Message{
  551. "abc": common.TimeFromUnixMilli(1568854515000),
  552. "def": common.TimeFromUnixMilli(1568854573431),
  553. },
  554. },
  555. },
  556. {
  557. stmt: &xsql.StreamStmt{
  558. Name: xsql.StreamName("demo"),
  559. StreamFields: nil,
  560. },
  561. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  562. result: &xsql.Tuple{Message: xsql.Message{
  563. "abc": "2019-09-19T00:55:15.000Z",
  564. "def": float64(1568854573431),
  565. },
  566. },
  567. },
  568. {
  569. stmt: &xsql.StreamStmt{
  570. Name: xsql.StreamName("demo"),
  571. StreamFields: []xsql.StreamField{
  572. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  573. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  574. },
  575. },
  576. data: []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
  577. result: errors.New("error in preprocessor: invalid data type for abc, cannot convert to datetime: parsing time \"2019-09-19T00:55:1dd5Z\" as \"2006-01-02T15:04:05.000Z07:00\": cannot parse \"1dd5Z\" as \"05\""),
  578. },
  579. {
  580. stmt: &xsql.StreamStmt{
  581. Name: xsql.StreamName("demo"),
  582. StreamFields: []xsql.StreamField{
  583. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  584. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  585. },
  586. Options: map[string]string{
  587. "DATASOURCE": "users",
  588. "FORMAT": "AVRO",
  589. "KEY": "USERID",
  590. "CONF_KEY": "srv1",
  591. "TYPE": "MQTT",
  592. "TIMESTAMP": "USERID",
  593. "TIMESTAMP_FORMAT": "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
  594. },
  595. },
  596. data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
  597. result: &xsql.Tuple{Message: xsql.Message{
  598. "abc": common.TimeFromUnixMilli(1568894115000),
  599. "def": common.TimeFromUnixMilli(1568854573431),
  600. }},
  601. },
  602. //Array type
  603. {
  604. stmt: &xsql.StreamStmt{
  605. Name: xsql.StreamName("demo"),
  606. StreamFields: []xsql.StreamField{
  607. {Name: "a", FieldType: &xsql.ArrayType{
  608. Type: xsql.DATETIME,
  609. }},
  610. },
  611. },
  612. data: []byte(`{"a": [1568854515123, 1568854573431]}`),
  613. result: &xsql.Tuple{Message: xsql.Message{
  614. "a": []time.Time{
  615. common.TimeFromUnixMilli(1568854515123),
  616. common.TimeFromUnixMilli(1568854573431),
  617. },
  618. },
  619. },
  620. },
  621. {
  622. stmt: &xsql.StreamStmt{
  623. Name: xsql.StreamName("demo"),
  624. StreamFields: []xsql.StreamField{
  625. {Name: "a", FieldType: &xsql.RecType{
  626. StreamFields: []xsql.StreamField{
  627. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  628. {Name: "c", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  629. },
  630. }},
  631. },
  632. },
  633. data: []byte(`{"a": {"b" : "hello", "c": 1568854515000}}`),
  634. result: &xsql.Tuple{Message: xsql.Message{
  635. "a": map[string]interface{}{
  636. "b": "hello",
  637. "c": common.TimeFromUnixMilli(1568854515000),
  638. },
  639. },
  640. },
  641. },
  642. }
  643. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  644. defer common.CloseLogger()
  645. contextLogger := common.Log.WithField("rule", "TestPreprocessorTime_Apply")
  646. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  647. for i, tt := range tests {
  648. pp := &Preprocessor{streamStmt: tt.stmt}
  649. dm := make(map[string]interface{})
  650. if e := json.Unmarshal(tt.data, &dm); e != nil {
  651. log.Fatal(e)
  652. return
  653. } else {
  654. tuple := &xsql.Tuple{Message: dm}
  655. fv, afv := xsql.NewAggregateFunctionValuers()
  656. result := pp.Apply(ctx, tuple, fv, afv)
  657. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  658. if rt, ok := result.(*xsql.Tuple); ok {
  659. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  660. rt.Message["abc"] = rtt.UTC()
  661. }
  662. }
  663. if !reflect.DeepEqual(tt.result, result) {
  664. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  665. }
  666. }
  667. }
  668. }
  669. func TestPreprocessorEventtime_Apply(t *testing.T) {
  670. var tests = []struct {
  671. stmt *xsql.StreamStmt
  672. data []byte
  673. result interface{}
  674. }{
  675. //Basic type
  676. {
  677. stmt: &xsql.StreamStmt{
  678. Name: xsql.StreamName("demo"),
  679. StreamFields: []xsql.StreamField{
  680. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  681. },
  682. Options: map[string]string{
  683. "DATASOURCE": "users",
  684. "FORMAT": "AVRO",
  685. "KEY": "USERID",
  686. "CONF_KEY": "srv1",
  687. "TYPE": "MQTT",
  688. "TIMESTAMP": "abc",
  689. "TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
  690. },
  691. },
  692. data: []byte(`{"abc": 1568854515000}`),
  693. result: &xsql.Tuple{Message: xsql.Message{
  694. "abc": int(1568854515000),
  695. }, Timestamp: 1568854515000,
  696. },
  697. },
  698. {
  699. stmt: &xsql.StreamStmt{
  700. Name: xsql.StreamName("demo"),
  701. StreamFields: nil,
  702. Options: map[string]string{
  703. "DATASOURCE": "users",
  704. "FORMAT": "AVRO",
  705. "KEY": "USERID",
  706. "CONF_KEY": "srv1",
  707. "TYPE": "MQTT",
  708. "TIMESTAMP": "abc",
  709. "TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
  710. },
  711. },
  712. data: []byte(`{"abc": 1568854515000}`),
  713. result: &xsql.Tuple{Message: xsql.Message{
  714. "abc": float64(1568854515000),
  715. }, Timestamp: 1568854515000,
  716. },
  717. },
  718. {
  719. stmt: &xsql.StreamStmt{
  720. Name: xsql.StreamName("demo"),
  721. StreamFields: []xsql.StreamField{
  722. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  723. },
  724. Options: map[string]string{
  725. "DATASOURCE": "users",
  726. "TIMESTAMP": "abc",
  727. },
  728. },
  729. data: []byte(`{"abc": true}`),
  730. result: errors.New("cannot convert timestamp field abc to timestamp with error unsupported type to convert to timestamp true"),
  731. },
  732. {
  733. stmt: &xsql.StreamStmt{
  734. Name: xsql.StreamName("demo"),
  735. StreamFields: []xsql.StreamField{
  736. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  737. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  738. },
  739. Options: map[string]string{
  740. "DATASOURCE": "users",
  741. "TIMESTAMP": "def",
  742. },
  743. },
  744. data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
  745. result: &xsql.Tuple{Message: xsql.Message{
  746. "abc": float64(34),
  747. "def": "2019-09-23T02:47:29.754Z",
  748. }, Timestamp: int64(1569206849754),
  749. },
  750. },
  751. {
  752. stmt: &xsql.StreamStmt{
  753. Name: xsql.StreamName("demo"),
  754. StreamFields: []xsql.StreamField{
  755. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  756. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  757. },
  758. Options: map[string]string{
  759. "DATASOURCE": "users",
  760. "TIMESTAMP": "abc",
  761. },
  762. },
  763. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  764. result: &xsql.Tuple{Message: xsql.Message{
  765. "abc": common.TimeFromUnixMilli(1568854515000),
  766. "def": common.TimeFromUnixMilli(1568854573431),
  767. }, Timestamp: int64(1568854515000),
  768. },
  769. },
  770. {
  771. stmt: &xsql.StreamStmt{
  772. Name: xsql.StreamName("demo"),
  773. StreamFields: []xsql.StreamField{
  774. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  775. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  776. },
  777. Options: map[string]string{
  778. "DATASOURCE": "users",
  779. "TIMESTAMP": "def",
  780. "TIMESTAMP_FORMAT": "yyyy-MM-dd'AT'HH:mm:ss",
  781. },
  782. },
  783. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  784. result: &xsql.Tuple{Message: xsql.Message{
  785. "abc": float64(34),
  786. "def": "2019-09-23AT02:47:29",
  787. }, Timestamp: int64(1569206849000),
  788. },
  789. },
  790. {
  791. stmt: &xsql.StreamStmt{
  792. Name: xsql.StreamName("demo"),
  793. StreamFields: []xsql.StreamField{
  794. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  795. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  796. },
  797. Options: map[string]string{
  798. "DATASOURCE": "users",
  799. "TIMESTAMP": "def",
  800. "TIMESTAMP_FORMAT": "yyyy-MM-ddaHH:mm:ss",
  801. },
  802. },
  803. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  804. result: errors.New("cannot convert timestamp field def to timestamp with error parsing time \"2019-09-23AT02:47:29\" as \"2006-01-02PM15:04:05\": cannot parse \"02:47:29\" as \"PM\""),
  805. },
  806. }
  807. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  808. defer common.CloseLogger()
  809. contextLogger := common.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
  810. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  811. for i, tt := range tests {
  812. pp, err := NewPreprocessor(tt.stmt, nil, true)
  813. if err != nil {
  814. t.Error(err)
  815. }
  816. dm := make(map[string]interface{})
  817. if e := json.Unmarshal(tt.data, &dm); e != nil {
  818. log.Fatal(e)
  819. return
  820. } else {
  821. tuple := &xsql.Tuple{Message: dm}
  822. fv, afv := xsql.NewAggregateFunctionValuers()
  823. result := pp.Apply(ctx, tuple, fv, afv)
  824. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  825. if rt, ok := result.(*xsql.Tuple); ok {
  826. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  827. rt.Message["abc"] = rtt.UTC()
  828. }
  829. }
  830. if !reflect.DeepEqual(tt.result, result) {
  831. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  832. }
  833. }
  834. }
  835. }
  836. func TestPreprocessorError(t *testing.T) {
  837. tests := []struct {
  838. stmt *xsql.StreamStmt
  839. data []byte
  840. result interface{}
  841. }{
  842. {
  843. stmt: &xsql.StreamStmt{
  844. Name: xsql.StreamName("demo"),
  845. StreamFields: []xsql.StreamField{
  846. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  847. },
  848. },
  849. data: []byte(`{"abc": "dafsad"}`),
  850. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(dafsad)"),
  851. }, {
  852. stmt: &xsql.StreamStmt{
  853. Name: xsql.StreamName("demo"),
  854. StreamFields: []xsql.StreamField{
  855. {Name: "a", FieldType: &xsql.RecType{
  856. StreamFields: []xsql.StreamField{
  857. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  858. },
  859. }},
  860. },
  861. },
  862. data: []byte(`{"a": {"d" : "hello"}}`),
  863. result: errors.New("error in preprocessor: invalid data map[d:hello], field b not found"),
  864. }, {
  865. stmt: &xsql.StreamStmt{
  866. Name: xsql.StreamName("demo"),
  867. StreamFields: []xsql.StreamField{
  868. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  869. },
  870. Options: map[string]string{
  871. "DATASOURCE": "users",
  872. "FORMAT": "AVRO",
  873. "KEY": "USERID",
  874. "CONF_KEY": "srv1",
  875. "TYPE": "MQTT",
  876. "TIMESTAMP": "abc",
  877. "TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
  878. },
  879. },
  880. data: []byte(`{"abc": "not a time"}`),
  881. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(not a time)"),
  882. },
  883. }
  884. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  885. defer common.CloseLogger()
  886. contextLogger := common.Log.WithField("rule", "TestPreprocessorError")
  887. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  888. for i, tt := range tests {
  889. pp := &Preprocessor{streamStmt: tt.stmt}
  890. dm := make(map[string]interface{})
  891. if e := json.Unmarshal(tt.data, &dm); e != nil {
  892. log.Fatal(e)
  893. return
  894. } else {
  895. tuple := &xsql.Tuple{Message: dm}
  896. fv, afv := xsql.NewAggregateFunctionValuers()
  897. result := pp.Apply(ctx, tuple, fv, afv)
  898. if !reflect.DeepEqual(tt.result, result) {
  899. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  900. }
  901. }
  902. }
  903. }