preprocessor_test.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080
  1. package operators
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/emqx/kuiper/common"
  8. "github.com/emqx/kuiper/xsql"
  9. "github.com/emqx/kuiper/xstream/contexts"
  10. "io/ioutil"
  11. "log"
  12. "path"
  13. "reflect"
  14. "testing"
  15. "time"
  16. )
  17. func TestPreprocessor_Apply(t *testing.T) {
  18. var tests = []struct {
  19. stmt *xsql.StreamStmt
  20. data []byte
  21. result interface{}
  22. }{
  23. //Basic type
  24. {
  25. stmt: &xsql.StreamStmt{
  26. Name: xsql.StreamName("demo"),
  27. StreamFields: []xsql.StreamField{
  28. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  29. },
  30. },
  31. data: []byte(`{"a": 6}`),
  32. result: errors.New("error in preprocessor: invalid data map[a:%!s(float64=6)], field abc not found"),
  33. },
  34. {
  35. stmt: &xsql.StreamStmt{
  36. Name: xsql.StreamName("demo"),
  37. StreamFields: []xsql.StreamField{
  38. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  39. },
  40. },
  41. data: []byte(`{"abc": null}`),
  42. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found <nil>(<nil>)"),
  43. },
  44. {
  45. stmt: &xsql.StreamStmt{
  46. Name: xsql.StreamName("demo"),
  47. StreamFields: nil,
  48. },
  49. data: []byte(`{"a": 6}`),
  50. result: &xsql.Tuple{Message: xsql.Message{
  51. "a": float64(6),
  52. },
  53. },
  54. },
  55. {
  56. stmt: &xsql.StreamStmt{
  57. Name: xsql.StreamName("demo"),
  58. StreamFields: []xsql.StreamField{
  59. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  60. },
  61. },
  62. data: []byte(`{"abc": 6}`),
  63. result: &xsql.Tuple{Message: xsql.Message{
  64. "abc": 6,
  65. },
  66. },
  67. },
  68. {
  69. stmt: &xsql.StreamStmt{
  70. Name: xsql.StreamName("demo"),
  71. StreamFields: nil,
  72. },
  73. data: []byte(`{"abc": 6}`),
  74. result: &xsql.Tuple{Message: xsql.Message{
  75. "abc": float64(6),
  76. },
  77. },
  78. },
  79. {
  80. stmt: &xsql.StreamStmt{
  81. Name: xsql.StreamName("demo"),
  82. StreamFields: []xsql.StreamField{
  83. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  84. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  85. },
  86. },
  87. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  88. result: &xsql.Tuple{Message: xsql.Message{
  89. "abc": float64(34),
  90. "def": "hello",
  91. },
  92. },
  93. },
  94. {
  95. stmt: &xsql.StreamStmt{
  96. Name: xsql.StreamName("demo"),
  97. StreamFields: nil,
  98. },
  99. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  100. result: &xsql.Tuple{Message: xsql.Message{
  101. "abc": float64(34),
  102. "def": "hello",
  103. "ghi": float64(50),
  104. },
  105. },
  106. },
  107. {
  108. stmt: &xsql.StreamStmt{
  109. Name: xsql.StreamName("demo"),
  110. StreamFields: []xsql.StreamField{
  111. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  112. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  113. },
  114. },
  115. data: []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
  116. result: &xsql.Tuple{Message: xsql.Message{
  117. "abc": float64(34),
  118. "def": "hello",
  119. },
  120. },
  121. },
  122. {
  123. stmt: &xsql.StreamStmt{
  124. Name: xsql.StreamName("demo"),
  125. StreamFields: []xsql.StreamField{
  126. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  127. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  128. },
  129. },
  130. data: []byte(`{"abc": 77, "def" : "hello"}`),
  131. result: errors.New("error in preprocessor: invalid data type for def, expect boolean but found string(hello)"),
  132. },
  133. {
  134. stmt: &xsql.StreamStmt{
  135. Name: xsql.StreamName("demo"),
  136. StreamFields: []xsql.StreamField{
  137. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  138. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  139. },
  140. },
  141. data: []byte(`{"a": {"b" : "hello"}}`),
  142. result: errors.New("error in preprocessor: invalid data map[a:map[b:hello]], field abc not found"),
  143. },
  144. {
  145. stmt: &xsql.StreamStmt{
  146. Name: xsql.StreamName("demo"),
  147. StreamFields: nil,
  148. },
  149. data: []byte(`{"a": {"b" : "hello"}}`),
  150. result: &xsql.Tuple{Message: xsql.Message{
  151. "a": map[string]interface{}{
  152. "b": "hello",
  153. },
  154. },
  155. },
  156. },
  157. //Rec type
  158. {
  159. stmt: &xsql.StreamStmt{
  160. Name: xsql.StreamName("demo"),
  161. StreamFields: []xsql.StreamField{
  162. {Name: "a", FieldType: &xsql.RecType{
  163. StreamFields: []xsql.StreamField{
  164. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  165. },
  166. }},
  167. },
  168. },
  169. data: []byte(`{"a": {"b" : "hello"}}`),
  170. result: &xsql.Tuple{Message: xsql.Message{
  171. "a": map[string]interface{}{
  172. "b": "hello",
  173. },
  174. },
  175. },
  176. },
  177. {
  178. stmt: &xsql.StreamStmt{
  179. Name: xsql.StreamName("demo"),
  180. StreamFields: []xsql.StreamField{
  181. {Name: "a", FieldType: &xsql.RecType{
  182. StreamFields: []xsql.StreamField{
  183. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  184. },
  185. }},
  186. },
  187. },
  188. data: []byte(`{"a": "{\"b\" : \"32\"}"}`),
  189. result: &xsql.Tuple{Message: xsql.Message{
  190. "a": map[string]interface{}{
  191. "b": float64(32),
  192. },
  193. },
  194. },
  195. },
  196. {
  197. stmt: &xsql.StreamStmt{
  198. Name: xsql.StreamName("demo"),
  199. StreamFields: nil,
  200. },
  201. data: []byte(`{"a": {"b" : "32"}}`),
  202. result: &xsql.Tuple{Message: xsql.Message{
  203. "a": map[string]interface{}{
  204. "b": "32",
  205. },
  206. },
  207. },
  208. },
  209. //Array of complex type
  210. {
  211. stmt: &xsql.StreamStmt{
  212. Name: xsql.StreamName("demo"),
  213. StreamFields: []xsql.StreamField{
  214. {Name: "a", FieldType: &xsql.ArrayType{
  215. Type: xsql.STRUCT,
  216. FieldType: &xsql.RecType{
  217. StreamFields: []xsql.StreamField{
  218. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  219. },
  220. },
  221. }},
  222. },
  223. },
  224. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  225. result: &xsql.Tuple{Message: xsql.Message{
  226. "a": []map[string]interface{}{
  227. {"b": "hello1"},
  228. {"b": "hello2"},
  229. },
  230. },
  231. },
  232. },
  233. {
  234. stmt: &xsql.StreamStmt{
  235. Name: xsql.StreamName("demo"),
  236. StreamFields: []xsql.StreamField{
  237. {Name: "a", FieldType: &xsql.ArrayType{
  238. Type: xsql.STRUCT,
  239. FieldType: &xsql.RecType{
  240. StreamFields: []xsql.StreamField{
  241. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  242. },
  243. },
  244. }},
  245. },
  246. },
  247. data: []byte(`{"a": []}`),
  248. result: &xsql.Tuple{Message: xsql.Message{
  249. "a": make([]map[string]interface{}, 0),
  250. },
  251. },
  252. },
  253. {
  254. stmt: &xsql.StreamStmt{
  255. Name: xsql.StreamName("demo"),
  256. StreamFields: []xsql.StreamField{
  257. {Name: "a", FieldType: &xsql.ArrayType{
  258. Type: xsql.STRUCT,
  259. FieldType: &xsql.RecType{
  260. StreamFields: []xsql.StreamField{
  261. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  262. },
  263. },
  264. }},
  265. },
  266. },
  267. data: []byte(`{"a": null}`),
  268. result: &xsql.Tuple{Message: xsql.Message{
  269. "a": []map[string]interface{}(nil),
  270. },
  271. },
  272. },
  273. {
  274. stmt: &xsql.StreamStmt{
  275. Name: xsql.StreamName("demo"),
  276. StreamFields: []xsql.StreamField{
  277. {Name: "a", FieldType: &xsql.ArrayType{
  278. Type: xsql.STRUCT,
  279. FieldType: &xsql.RecType{
  280. StreamFields: []xsql.StreamField{
  281. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  282. },
  283. },
  284. }},
  285. },
  286. },
  287. data: []byte(`{"a": [null, {"b" : "hello2"}]}`),
  288. result: &xsql.Tuple{Message: xsql.Message{
  289. "a": []map[string]interface{}{
  290. nil,
  291. {"b": "hello2"},
  292. },
  293. },
  294. },
  295. },
  296. {
  297. stmt: &xsql.StreamStmt{
  298. Name: xsql.StreamName("demo"),
  299. StreamFields: []xsql.StreamField{
  300. {Name: "a", FieldType: &xsql.ArrayType{
  301. Type: xsql.ARRAY,
  302. FieldType: &xsql.ArrayType{
  303. Type: xsql.BIGINT,
  304. },
  305. }},
  306. },
  307. },
  308. data: []byte(`{"a": [[50, 60, 70],[66], [77]]}`),
  309. result: &xsql.Tuple{Message: xsql.Message{
  310. "a": [][]int{
  311. {50, 60, 70},
  312. {66},
  313. {77},
  314. },
  315. },
  316. },
  317. },
  318. {
  319. stmt: &xsql.StreamStmt{
  320. Name: xsql.StreamName("demo"),
  321. StreamFields: []xsql.StreamField{
  322. {Name: "a", FieldType: &xsql.ArrayType{
  323. Type: xsql.ARRAY,
  324. FieldType: &xsql.ArrayType{
  325. Type: xsql.BIGINT,
  326. },
  327. }},
  328. },
  329. },
  330. data: []byte(`{"a": [null, [66], [77]]}`),
  331. result: &xsql.Tuple{Message: xsql.Message{
  332. "a": [][]int{
  333. []int(nil),
  334. {66},
  335. {77},
  336. },
  337. },
  338. },
  339. },
  340. {
  341. stmt: &xsql.StreamStmt{
  342. Name: xsql.StreamName("demo"),
  343. StreamFields: nil,
  344. },
  345. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  346. result: &xsql.Tuple{Message: xsql.Message{
  347. "a": []interface{}{
  348. map[string]interface{}{"b": "hello1"},
  349. map[string]interface{}{"b": "hello2"},
  350. },
  351. },
  352. },
  353. },
  354. {
  355. stmt: &xsql.StreamStmt{
  356. Name: xsql.StreamName("demo"),
  357. StreamFields: []xsql.StreamField{
  358. {Name: "a", FieldType: &xsql.ArrayType{
  359. Type: xsql.FLOAT,
  360. }},
  361. },
  362. },
  363. data: []byte(`{"a": "[\"55\", \"77\"]"}`),
  364. result: &xsql.Tuple{Message: xsql.Message{
  365. "a": []float64{
  366. 55,
  367. 77,
  368. },
  369. },
  370. },
  371. },
  372. {
  373. stmt: &xsql.StreamStmt{
  374. Name: xsql.StreamName("demo"),
  375. StreamFields: nil,
  376. },
  377. data: []byte(`{"a": [55, 77]}`),
  378. result: &xsql.Tuple{Message: xsql.Message{
  379. "a": []interface{}{
  380. float64(55),
  381. float64(77),
  382. },
  383. },
  384. },
  385. },
  386. //Rec of complex type
  387. {
  388. stmt: &xsql.StreamStmt{
  389. Name: xsql.StreamName("demo"),
  390. StreamFields: []xsql.StreamField{
  391. {Name: "a", FieldType: &xsql.RecType{
  392. StreamFields: []xsql.StreamField{
  393. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  394. {Name: "c", FieldType: &xsql.RecType{
  395. StreamFields: []xsql.StreamField{
  396. {Name: "d", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  397. },
  398. }},
  399. },
  400. }},
  401. },
  402. },
  403. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  404. result: &xsql.Tuple{Message: xsql.Message{
  405. "a": map[string]interface{}{
  406. "b": "hello",
  407. "c": map[string]interface{}{
  408. "d": int(35),
  409. },
  410. },
  411. },
  412. },
  413. },
  414. {
  415. stmt: &xsql.StreamStmt{
  416. Name: xsql.StreamName("demo"),
  417. StreamFields: []xsql.StreamField{
  418. {Name: "a", FieldType: &xsql.RecType{
  419. StreamFields: []xsql.StreamField{
  420. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  421. {Name: "c", FieldType: &xsql.RecType{
  422. StreamFields: []xsql.StreamField{
  423. {Name: "d", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  424. },
  425. }},
  426. },
  427. }},
  428. },
  429. },
  430. data: []byte(`{"a": null}`),
  431. result: &xsql.Tuple{Message: xsql.Message{
  432. "a": map[string]interface{}(nil),
  433. },
  434. },
  435. },
  436. {
  437. stmt: &xsql.StreamStmt{
  438. Name: xsql.StreamName("demo"),
  439. StreamFields: []xsql.StreamField{
  440. {Name: "a", FieldType: &xsql.RecType{
  441. StreamFields: []xsql.StreamField{
  442. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  443. {Name: "c", FieldType: &xsql.ArrayType{
  444. Type: xsql.FLOAT,
  445. }},
  446. },
  447. }},
  448. },
  449. },
  450. data: []byte(`{"a": {"b" : "hello", "c": [35.2, 38.2]}}`),
  451. result: &xsql.Tuple{Message: xsql.Message{
  452. "a": map[string]interface{}{
  453. "b": "hello",
  454. "c": []float64{
  455. 35.2, 38.2,
  456. },
  457. },
  458. },
  459. },
  460. },
  461. {
  462. stmt: &xsql.StreamStmt{
  463. Name: xsql.StreamName("demo"),
  464. StreamFields: []xsql.StreamField{
  465. {Name: "a", FieldType: &xsql.RecType{
  466. StreamFields: []xsql.StreamField{
  467. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  468. {Name: "c", FieldType: &xsql.ArrayType{
  469. Type: xsql.FLOAT,
  470. }},
  471. },
  472. }},
  473. },
  474. },
  475. data: []byte(`{"a": {"b" : "hello", "c": null}}`),
  476. result: &xsql.Tuple{Message: xsql.Message{
  477. "a": map[string]interface{}{
  478. "b": "hello",
  479. "c": []float64(nil),
  480. },
  481. },
  482. },
  483. },
  484. {
  485. stmt: &xsql.StreamStmt{
  486. Name: xsql.StreamName("demo"),
  487. StreamFields: []xsql.StreamField{
  488. {Name: "a", FieldType: &xsql.RecType{
  489. StreamFields: []xsql.StreamField{
  490. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  491. {Name: "c", FieldType: &xsql.ArrayType{
  492. Type: xsql.FLOAT,
  493. }},
  494. },
  495. }},
  496. },
  497. },
  498. data: []byte(`{"a": {"b" : "hello", "c": [null, 35.4]}}`),
  499. result: errors.New("error in preprocessor: fail to parse field c: invalid data type for [0], expect float but found <nil>(<nil>)"),
  500. },
  501. {
  502. stmt: &xsql.StreamStmt{
  503. Name: xsql.StreamName("demo"),
  504. StreamFields: nil,
  505. },
  506. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  507. result: &xsql.Tuple{Message: xsql.Message{
  508. "a": map[string]interface{}{
  509. "b": "hello",
  510. "c": map[string]interface{}{
  511. "d": 35.2,
  512. },
  513. },
  514. },
  515. },
  516. },
  517. }
  518. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  519. defer common.CloseLogger()
  520. contextLogger := common.Log.WithField("rule", "TestPreprocessor_Apply")
  521. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  522. for i, tt := range tests {
  523. pp := &Preprocessor{}
  524. pp.streamFields = convertFields(tt.stmt.StreamFields)
  525. dm := make(map[string]interface{})
  526. if e := json.Unmarshal(tt.data, &dm); e != nil {
  527. log.Fatal(e)
  528. return
  529. } else {
  530. tuple := &xsql.Tuple{Message: dm}
  531. fv, afv := xsql.NewFunctionValuersForOp(nil)
  532. result := pp.Apply(ctx, tuple, fv, afv)
  533. if !reflect.DeepEqual(tt.result, result) {
  534. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  535. }
  536. }
  537. }
  538. }
  539. func TestPreprocessorTime_Apply(t *testing.T) {
  540. var tests = []struct {
  541. stmt *xsql.StreamStmt
  542. data []byte
  543. result interface{}
  544. }{
  545. {
  546. stmt: &xsql.StreamStmt{
  547. Name: xsql.StreamName("demo"),
  548. StreamFields: []xsql.StreamField{
  549. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  550. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  551. },
  552. },
  553. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  554. result: &xsql.Tuple{Message: xsql.Message{
  555. "abc": common.TimeFromUnixMilli(1568854515000),
  556. "def": common.TimeFromUnixMilli(1568854573431),
  557. },
  558. },
  559. },
  560. {
  561. stmt: &xsql.StreamStmt{
  562. Name: xsql.StreamName("demo"),
  563. StreamFields: nil,
  564. },
  565. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  566. result: &xsql.Tuple{Message: xsql.Message{
  567. "abc": "2019-09-19T00:55:15.000Z",
  568. "def": float64(1568854573431),
  569. },
  570. },
  571. },
  572. {
  573. stmt: &xsql.StreamStmt{
  574. Name: xsql.StreamName("demo"),
  575. StreamFields: []xsql.StreamField{
  576. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  577. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  578. },
  579. },
  580. data: []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
  581. result: errors.New("error in preprocessor: invalid data type for abc, cannot convert to datetime: parsing time \"2019-09-19T00:55:1dd5Z\" as \"2006-01-02T15:04:05.000Z07:00\": cannot parse \"1dd5Z\" as \"05\""),
  582. },
  583. {
  584. stmt: &xsql.StreamStmt{
  585. Name: xsql.StreamName("demo"),
  586. StreamFields: []xsql.StreamField{
  587. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  588. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  589. },
  590. Options: map[string]string{
  591. "DATASOURCE": "users",
  592. "FORMAT": "JSON",
  593. "KEY": "USERID",
  594. "CONF_KEY": "srv1",
  595. "TYPE": "MQTT",
  596. "TIMESTAMP": "USERID",
  597. "TIMESTAMP_FORMAT": "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
  598. },
  599. },
  600. data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
  601. result: &xsql.Tuple{Message: xsql.Message{
  602. "abc": common.TimeFromUnixMilli(1568894115000),
  603. "def": common.TimeFromUnixMilli(1568854573431),
  604. }},
  605. },
  606. //Array type
  607. {
  608. stmt: &xsql.StreamStmt{
  609. Name: xsql.StreamName("demo"),
  610. StreamFields: []xsql.StreamField{
  611. {Name: "a", FieldType: &xsql.ArrayType{
  612. Type: xsql.DATETIME,
  613. }},
  614. },
  615. },
  616. data: []byte(`{"a": [1568854515123, 1568854573431]}`),
  617. result: &xsql.Tuple{Message: xsql.Message{
  618. "a": []time.Time{
  619. common.TimeFromUnixMilli(1568854515123),
  620. common.TimeFromUnixMilli(1568854573431),
  621. },
  622. },
  623. },
  624. },
  625. {
  626. stmt: &xsql.StreamStmt{
  627. Name: xsql.StreamName("demo"),
  628. StreamFields: []xsql.StreamField{
  629. {Name: "a", FieldType: &xsql.RecType{
  630. StreamFields: []xsql.StreamField{
  631. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  632. {Name: "c", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  633. },
  634. }},
  635. },
  636. },
  637. data: []byte(`{"a": {"b" : "hello", "c": 1568854515000}}`),
  638. result: &xsql.Tuple{Message: xsql.Message{
  639. "a": map[string]interface{}{
  640. "b": "hello",
  641. "c": common.TimeFromUnixMilli(1568854515000),
  642. },
  643. },
  644. },
  645. },
  646. }
  647. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  648. defer common.CloseLogger()
  649. contextLogger := common.Log.WithField("rule", "TestPreprocessorTime_Apply")
  650. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  651. for i, tt := range tests {
  652. pp := &Preprocessor{}
  653. pp.streamFields = convertFields(tt.stmt.StreamFields)
  654. pp.timestampFormat = tt.stmt.Options["TIMESTAMP_FORMAT"]
  655. dm := make(map[string]interface{})
  656. if e := json.Unmarshal(tt.data, &dm); e != nil {
  657. log.Fatal(e)
  658. return
  659. } else {
  660. tuple := &xsql.Tuple{Message: dm}
  661. fv, afv := xsql.NewFunctionValuersForOp(nil)
  662. result := pp.Apply(ctx, tuple, fv, afv)
  663. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  664. if rt, ok := result.(*xsql.Tuple); ok {
  665. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  666. rt.Message["abc"] = rtt.UTC()
  667. }
  668. }
  669. if !reflect.DeepEqual(tt.result, result) {
  670. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  671. }
  672. }
  673. }
  674. }
  675. func convertFields(o xsql.StreamFields) []interface{} {
  676. if o == nil {
  677. return nil
  678. }
  679. fields := make([]interface{}, len(o))
  680. for i, _ := range o {
  681. fields[i] = &o[i]
  682. }
  683. return fields
  684. }
  685. func TestPreprocessorEventtime_Apply(t *testing.T) {
  686. var tests = []struct {
  687. stmt *xsql.StreamStmt
  688. data []byte
  689. result interface{}
  690. }{
  691. //Basic type
  692. {
  693. stmt: &xsql.StreamStmt{
  694. Name: xsql.StreamName("demo"),
  695. StreamFields: []xsql.StreamField{
  696. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  697. },
  698. Options: map[string]string{
  699. "DATASOURCE": "users",
  700. "FORMAT": "JSON",
  701. "KEY": "USERID",
  702. "CONF_KEY": "srv1",
  703. "TYPE": "MQTT",
  704. "TIMESTAMP": "abc",
  705. "TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
  706. },
  707. },
  708. data: []byte(`{"abc": 1568854515000}`),
  709. result: &xsql.Tuple{Message: xsql.Message{
  710. "abc": int(1568854515000),
  711. }, Timestamp: 1568854515000,
  712. },
  713. },
  714. {
  715. stmt: &xsql.StreamStmt{
  716. Name: xsql.StreamName("demo"),
  717. StreamFields: nil,
  718. Options: map[string]string{
  719. "DATASOURCE": "users",
  720. "FORMAT": "JSON",
  721. "KEY": "USERID",
  722. "CONF_KEY": "srv1",
  723. "TYPE": "MQTT",
  724. "TIMESTAMP": "abc",
  725. "TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
  726. },
  727. },
  728. data: []byte(`{"abc": 1568854515000}`),
  729. result: &xsql.Tuple{Message: xsql.Message{
  730. "abc": float64(1568854515000),
  731. }, Timestamp: 1568854515000,
  732. },
  733. },
  734. {
  735. stmt: &xsql.StreamStmt{
  736. Name: xsql.StreamName("demo"),
  737. StreamFields: []xsql.StreamField{
  738. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  739. },
  740. Options: map[string]string{
  741. "DATASOURCE": "users",
  742. "TIMESTAMP": "abc",
  743. },
  744. },
  745. data: []byte(`{"abc": true}`),
  746. result: errors.New("cannot convert timestamp field abc to timestamp with error unsupported type to convert to timestamp true"),
  747. },
  748. {
  749. stmt: &xsql.StreamStmt{
  750. Name: xsql.StreamName("demo"),
  751. StreamFields: []xsql.StreamField{
  752. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  753. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  754. },
  755. Options: map[string]string{
  756. "DATASOURCE": "users",
  757. "TIMESTAMP": "def",
  758. },
  759. },
  760. data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
  761. result: &xsql.Tuple{Message: xsql.Message{
  762. "abc": float64(34),
  763. "def": "2019-09-23T02:47:29.754Z",
  764. }, Timestamp: int64(1569206849754),
  765. },
  766. },
  767. {
  768. stmt: &xsql.StreamStmt{
  769. Name: xsql.StreamName("demo"),
  770. StreamFields: []xsql.StreamField{
  771. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  772. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  773. },
  774. Options: map[string]string{
  775. "DATASOURCE": "users",
  776. "TIMESTAMP": "abc",
  777. },
  778. },
  779. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  780. result: &xsql.Tuple{Message: xsql.Message{
  781. "abc": common.TimeFromUnixMilli(1568854515000),
  782. "def": common.TimeFromUnixMilli(1568854573431),
  783. }, Timestamp: int64(1568854515000),
  784. },
  785. },
  786. {
  787. stmt: &xsql.StreamStmt{
  788. Name: xsql.StreamName("demo"),
  789. StreamFields: []xsql.StreamField{
  790. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  791. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  792. },
  793. Options: map[string]string{
  794. "DATASOURCE": "users",
  795. "TIMESTAMP": "def",
  796. "TIMESTAMP_FORMAT": "yyyy-MM-dd'AT'HH:mm:ss",
  797. },
  798. },
  799. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  800. result: &xsql.Tuple{Message: xsql.Message{
  801. "abc": float64(34),
  802. "def": "2019-09-23AT02:47:29",
  803. }, Timestamp: int64(1569206849000),
  804. },
  805. },
  806. {
  807. stmt: &xsql.StreamStmt{
  808. Name: xsql.StreamName("demo"),
  809. StreamFields: []xsql.StreamField{
  810. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  811. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  812. },
  813. Options: map[string]string{
  814. "DATASOURCE": "users",
  815. "TIMESTAMP": "def",
  816. "TIMESTAMP_FORMAT": "yyyy-MM-ddaHH:mm:ss",
  817. },
  818. },
  819. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  820. result: errors.New("cannot convert timestamp field def to timestamp with error parsing time \"2019-09-23AT02:47:29\" as \"2006-01-02PM15:04:05\": cannot parse \"02:47:29\" as \"PM\""),
  821. },
  822. }
  823. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  824. defer common.CloseLogger()
  825. contextLogger := common.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
  826. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  827. for i, tt := range tests {
  828. pp := &Preprocessor{
  829. defaultFieldProcessor: defaultFieldProcessor{
  830. streamFields: convertFields(tt.stmt.StreamFields),
  831. aliasFields: nil,
  832. isBinary: false,
  833. timestampFormat: tt.stmt.Options["TIMESTAMP_FORMAT"],
  834. },
  835. isEventTime: true,
  836. timestampField: tt.stmt.Options["TIMESTAMP"],
  837. }
  838. dm := make(map[string]interface{})
  839. if e := json.Unmarshal(tt.data, &dm); e != nil {
  840. log.Fatal(e)
  841. return
  842. } else {
  843. tuple := &xsql.Tuple{Message: dm}
  844. fv, afv := xsql.NewFunctionValuersForOp(nil)
  845. result := pp.Apply(ctx, tuple, fv, afv)
  846. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  847. if rt, ok := result.(*xsql.Tuple); ok {
  848. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  849. rt.Message["abc"] = rtt.UTC()
  850. }
  851. }
  852. if !reflect.DeepEqual(tt.result, result) {
  853. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  854. }
  855. }
  856. }
  857. }
  858. func TestPreprocessorError(t *testing.T) {
  859. tests := []struct {
  860. stmt *xsql.StreamStmt
  861. data []byte
  862. result interface{}
  863. }{
  864. {
  865. stmt: &xsql.StreamStmt{
  866. Name: xsql.StreamName("demo"),
  867. StreamFields: []xsql.StreamField{
  868. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  869. },
  870. },
  871. data: []byte(`{"abc": "dafsad"}`),
  872. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(dafsad)"),
  873. }, {
  874. stmt: &xsql.StreamStmt{
  875. Name: xsql.StreamName("demo"),
  876. StreamFields: []xsql.StreamField{
  877. {Name: "a", FieldType: &xsql.RecType{
  878. StreamFields: []xsql.StreamField{
  879. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  880. },
  881. }},
  882. },
  883. },
  884. data: []byte(`{"a": {"d" : "hello"}}`),
  885. result: errors.New("error in preprocessor: invalid data map[d:hello], field b not found"),
  886. }, {
  887. stmt: &xsql.StreamStmt{
  888. Name: xsql.StreamName("demo"),
  889. StreamFields: []xsql.StreamField{
  890. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  891. },
  892. Options: map[string]string{
  893. "DATASOURCE": "users",
  894. "FORMAT": "JSON",
  895. "KEY": "USERID",
  896. "CONF_KEY": "srv1",
  897. "TYPE": "MQTT",
  898. "TIMESTAMP": "abc",
  899. "TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
  900. },
  901. },
  902. data: []byte(`{"abc": "not a time"}`),
  903. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(not a time)"),
  904. },
  905. }
  906. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  907. defer common.CloseLogger()
  908. contextLogger := common.Log.WithField("rule", "TestPreprocessorError")
  909. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  910. for i, tt := range tests {
  911. pp := &Preprocessor{}
  912. pp.streamFields = convertFields(tt.stmt.StreamFields)
  913. dm := make(map[string]interface{})
  914. if e := json.Unmarshal(tt.data, &dm); e != nil {
  915. log.Fatal(e)
  916. return
  917. } else {
  918. tuple := &xsql.Tuple{Message: dm}
  919. fv, afv := xsql.NewFunctionValuersForOp(nil)
  920. result := pp.Apply(ctx, tuple, fv, afv)
  921. if !reflect.DeepEqual(tt.result, result) {
  922. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  923. }
  924. }
  925. }
  926. }
  927. func TestPreprocessorForBinary(t *testing.T) {
  928. docsFolder, err := common.GetLoc("/docs/")
  929. if err != nil {
  930. t.Errorf("Cannot find docs folder: %v", err)
  931. }
  932. image, err := ioutil.ReadFile(path.Join(docsFolder, "cover.jpg"))
  933. if err != nil {
  934. t.Errorf("Cannot read image: %v", err)
  935. }
  936. b64img := base64.StdEncoding.EncodeToString(image)
  937. //TODO test bytea type conversion to string or else
  938. var tests = []struct {
  939. stmt *xsql.StreamStmt
  940. data []byte
  941. isBinary bool
  942. result interface{}
  943. }{
  944. {
  945. stmt: &xsql.StreamStmt{
  946. Name: xsql.StreamName("demo"),
  947. StreamFields: nil,
  948. },
  949. data: image,
  950. isBinary: true,
  951. result: &xsql.Tuple{Message: xsql.Message{
  952. "self": image,
  953. },
  954. },
  955. },
  956. {
  957. stmt: &xsql.StreamStmt{
  958. Name: xsql.StreamName("demo"),
  959. StreamFields: []xsql.StreamField{
  960. {Name: "img", FieldType: &xsql.BasicType{Type: xsql.BYTEA}},
  961. },
  962. },
  963. data: image,
  964. isBinary: true,
  965. result: &xsql.Tuple{Message: xsql.Message{
  966. "img": image,
  967. },
  968. },
  969. },
  970. {
  971. stmt: &xsql.StreamStmt{
  972. Name: xsql.StreamName("demo"),
  973. StreamFields: []xsql.StreamField{
  974. {Name: "a", FieldType: &xsql.RecType{
  975. StreamFields: []xsql.StreamField{
  976. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.BYTEA}},
  977. },
  978. }},
  979. },
  980. },
  981. data: []byte(fmt.Sprintf(`{"a": {"b" : "%s"}}`, b64img)),
  982. result: &xsql.Tuple{Message: xsql.Message{
  983. "a": map[string]interface{}{
  984. "b": image,
  985. },
  986. },
  987. },
  988. },
  989. {
  990. stmt: &xsql.StreamStmt{
  991. Name: xsql.StreamName("demo"),
  992. StreamFields: []xsql.StreamField{
  993. {Name: "a", FieldType: &xsql.ArrayType{
  994. Type: xsql.BYTEA,
  995. }},
  996. },
  997. },
  998. data: []byte(fmt.Sprintf(`{"a": ["%s"]}`, b64img)),
  999. result: &xsql.Tuple{Message: xsql.Message{
  1000. "a": [][]byte{
  1001. image,
  1002. },
  1003. },
  1004. },
  1005. },
  1006. {
  1007. stmt: &xsql.StreamStmt{
  1008. Name: xsql.StreamName("demo"),
  1009. StreamFields: []xsql.StreamField{
  1010. {Name: "a", FieldType: &xsql.ArrayType{
  1011. Type: xsql.STRUCT,
  1012. FieldType: &xsql.RecType{
  1013. StreamFields: []xsql.StreamField{
  1014. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.BYTEA}},
  1015. },
  1016. },
  1017. }},
  1018. },
  1019. },
  1020. data: []byte(fmt.Sprintf(`{"a": [{"b":"%s"}]}`, b64img)),
  1021. result: &xsql.Tuple{Message: xsql.Message{
  1022. "a": []map[string]interface{}{
  1023. {"b": image},
  1024. },
  1025. },
  1026. },
  1027. },
  1028. }
  1029. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1030. defer common.CloseLogger()
  1031. contextLogger := common.Log.WithField("rule", "TestPreprocessorForBinary")
  1032. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1033. for i, tt := range tests {
  1034. pp := &Preprocessor{}
  1035. pp.streamFields = convertFields(tt.stmt.StreamFields)
  1036. pp.isBinary = tt.isBinary
  1037. format := "json"
  1038. if tt.isBinary {
  1039. format = "binary"
  1040. }
  1041. if dm, e := common.MessageDecode(tt.data, format); e != nil {
  1042. log.Fatal(e)
  1043. return
  1044. } else {
  1045. tuple := &xsql.Tuple{Message: dm}
  1046. fv, afv := xsql.NewFunctionValuersForOp(nil)
  1047. result := pp.Apply(ctx, tuple, fv, afv)
  1048. if !reflect.DeepEqual(tt.result, result) {
  1049. t.Errorf("%d. %q\n\nresult mismatch", i, tuple)
  1050. }
  1051. }
  1052. }
  1053. }