preprocessor_test.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722
  1. package plans
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/contexts"
  9. "log"
  10. "reflect"
  11. "testing"
  12. "time"
  13. )
  14. func TestPreprocessor_Apply(t *testing.T) {
  15. var tests = []struct {
  16. stmt *xsql.StreamStmt
  17. data []byte
  18. result interface{}
  19. }{
  20. //Basic type
  21. {
  22. stmt: &xsql.StreamStmt{
  23. Name: xsql.StreamName("demo"),
  24. StreamFields: []xsql.StreamField{
  25. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  26. },
  27. },
  28. data: []byte(`{"a": 6}`),
  29. result: errors.New("error in preprocessor: invalid data map[a:%!s(float64=6)], field abc not found"),
  30. },
  31. {
  32. stmt: &xsql.StreamStmt{
  33. Name: xsql.StreamName("demo"),
  34. StreamFields: nil,
  35. },
  36. data: []byte(`{"a": 6}`),
  37. result: &xsql.Tuple{Message: xsql.Message{
  38. "a": float64(6),
  39. },
  40. },
  41. },
  42. {
  43. stmt: &xsql.StreamStmt{
  44. Name: xsql.StreamName("demo"),
  45. StreamFields: []xsql.StreamField{
  46. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  47. },
  48. },
  49. data: []byte(`{"abc": 6}`),
  50. result: &xsql.Tuple{Message: xsql.Message{
  51. "abc": 6,
  52. },
  53. },
  54. },
  55. {
  56. stmt: &xsql.StreamStmt{
  57. Name: xsql.StreamName("demo"),
  58. StreamFields: nil,
  59. },
  60. data: []byte(`{"abc": 6}`),
  61. result: &xsql.Tuple{Message: xsql.Message{
  62. "abc": float64(6),
  63. },
  64. },
  65. },
  66. {
  67. stmt: &xsql.StreamStmt{
  68. Name: xsql.StreamName("demo"),
  69. StreamFields: []xsql.StreamField{
  70. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  71. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  72. },
  73. },
  74. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  75. result: &xsql.Tuple{Message: xsql.Message{
  76. "abc": float64(34),
  77. "def": "hello",
  78. },
  79. },
  80. },
  81. {
  82. stmt: &xsql.StreamStmt{
  83. Name: xsql.StreamName("demo"),
  84. StreamFields: nil,
  85. },
  86. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  87. result: &xsql.Tuple{Message: xsql.Message{
  88. "abc": float64(34),
  89. "def": "hello",
  90. "ghi": float64(50),
  91. },
  92. },
  93. },
  94. {
  95. stmt: &xsql.StreamStmt{
  96. Name: xsql.StreamName("demo"),
  97. StreamFields: []xsql.StreamField{
  98. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  99. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  100. },
  101. },
  102. data: []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
  103. result: &xsql.Tuple{Message: xsql.Message{
  104. "abc": float64(34),
  105. "def": "hello",
  106. },
  107. },
  108. },
  109. {
  110. stmt: &xsql.StreamStmt{
  111. Name: xsql.StreamName("demo"),
  112. StreamFields: []xsql.StreamField{
  113. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  114. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  115. },
  116. },
  117. data: []byte(`{"abc": 77, "def" : "hello"}`),
  118. result: errors.New("error in preprocessor: invalid data type for def, expect boolean but found string(hello)"),
  119. },
  120. {
  121. stmt: &xsql.StreamStmt{
  122. Name: xsql.StreamName("demo"),
  123. StreamFields: []xsql.StreamField{
  124. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  125. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  126. },
  127. },
  128. data: []byte(`{"a": {"b" : "hello"}}`),
  129. result: errors.New("error in preprocessor: invalid data map[a:map[b:hello]], field abc not found"),
  130. },
  131. {
  132. stmt: &xsql.StreamStmt{
  133. Name: xsql.StreamName("demo"),
  134. StreamFields: nil,
  135. },
  136. data: []byte(`{"a": {"b" : "hello"}}`),
  137. result: &xsql.Tuple{Message: xsql.Message{
  138. "a": map[string]interface{}{
  139. "b": "hello",
  140. },
  141. },
  142. },
  143. },
  144. //Rec type
  145. {
  146. stmt: &xsql.StreamStmt{
  147. Name: xsql.StreamName("demo"),
  148. StreamFields: []xsql.StreamField{
  149. {Name: "a", FieldType: &xsql.RecType{
  150. StreamFields: []xsql.StreamField{
  151. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  152. },
  153. }},
  154. },
  155. },
  156. data: []byte(`{"a": {"b" : "hello"}}`),
  157. result: &xsql.Tuple{Message: xsql.Message{
  158. "a": map[string]interface{}{
  159. "b": "hello",
  160. },
  161. },
  162. },
  163. },
  164. //Rec type
  165. {
  166. stmt: &xsql.StreamStmt{
  167. Name: xsql.StreamName("demo"),
  168. StreamFields: []xsql.StreamField{
  169. {Name: "a", FieldType: &xsql.RecType{
  170. StreamFields: []xsql.StreamField{
  171. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  172. },
  173. }},
  174. },
  175. },
  176. data: []byte(`{"a": "{\"b\" : \"32\"}"}`),
  177. result: &xsql.Tuple{Message: xsql.Message{
  178. "a": map[string]interface{}{
  179. "b": float64(32),
  180. },
  181. },
  182. },
  183. },
  184. {
  185. stmt: &xsql.StreamStmt{
  186. Name: xsql.StreamName("demo"),
  187. StreamFields: nil,
  188. },
  189. data: []byte(`{"a": {"b" : "32"}}`),
  190. result: &xsql.Tuple{Message: xsql.Message{
  191. "a": map[string]interface{}{
  192. "b": "32",
  193. },
  194. },
  195. },
  196. },
  197. //Array of complex type
  198. {
  199. stmt: &xsql.StreamStmt{
  200. Name: xsql.StreamName("demo"),
  201. StreamFields: []xsql.StreamField{
  202. {Name: "a", FieldType: &xsql.ArrayType{
  203. Type: xsql.STRUCT,
  204. FieldType: &xsql.RecType{
  205. StreamFields: []xsql.StreamField{
  206. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  207. },
  208. },
  209. }},
  210. },
  211. },
  212. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  213. result: &xsql.Tuple{Message: xsql.Message{
  214. "a": []map[string]interface{}{
  215. {"b": "hello1"},
  216. {"b": "hello2"},
  217. },
  218. },
  219. },
  220. },
  221. {
  222. stmt: &xsql.StreamStmt{
  223. Name: xsql.StreamName("demo"),
  224. StreamFields: nil,
  225. },
  226. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  227. result: &xsql.Tuple{Message: xsql.Message{
  228. "a": []interface{}{
  229. map[string]interface{}{"b": "hello1"},
  230. map[string]interface{}{"b": "hello2"},
  231. },
  232. },
  233. },
  234. },
  235. {
  236. stmt: &xsql.StreamStmt{
  237. Name: xsql.StreamName("demo"),
  238. StreamFields: []xsql.StreamField{
  239. {Name: "a", FieldType: &xsql.ArrayType{
  240. Type: xsql.FLOAT,
  241. }},
  242. },
  243. },
  244. data: []byte(`{"a": "[\"55\", \"77\"]"}`),
  245. result: &xsql.Tuple{Message: xsql.Message{
  246. "a": []float64{
  247. 55,
  248. 77,
  249. },
  250. },
  251. },
  252. },
  253. {
  254. stmt: &xsql.StreamStmt{
  255. Name: xsql.StreamName("demo"),
  256. StreamFields: nil,
  257. },
  258. data: []byte(`{"a": [55, 77]}`),
  259. result: &xsql.Tuple{Message: xsql.Message{
  260. "a": []interface{}{
  261. float64(55),
  262. float64(77),
  263. },
  264. },
  265. },
  266. },
  267. //Rec of complex type
  268. {
  269. stmt: &xsql.StreamStmt{
  270. Name: xsql.StreamName("demo"),
  271. StreamFields: []xsql.StreamField{
  272. {Name: "a", FieldType: &xsql.RecType{
  273. StreamFields: []xsql.StreamField{
  274. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  275. {Name: "c", FieldType: &xsql.RecType{
  276. StreamFields: []xsql.StreamField{
  277. {Name: "d", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  278. },
  279. }},
  280. },
  281. }},
  282. },
  283. },
  284. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  285. result: &xsql.Tuple{Message: xsql.Message{
  286. "a": map[string]interface{}{
  287. "b": "hello",
  288. "c": map[string]interface{}{
  289. "d": int(35),
  290. },
  291. },
  292. },
  293. },
  294. },
  295. {
  296. stmt: &xsql.StreamStmt{
  297. Name: xsql.StreamName("demo"),
  298. StreamFields: nil,
  299. },
  300. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  301. result: &xsql.Tuple{Message: xsql.Message{
  302. "a": map[string]interface{}{
  303. "b": "hello",
  304. "c": map[string]interface{}{
  305. "d": 35.2,
  306. },
  307. },
  308. },
  309. },
  310. },
  311. }
  312. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  313. defer common.CloseLogger()
  314. contextLogger := common.Log.WithField("rule", "TestPreprocessor_Apply")
  315. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  316. for i, tt := range tests {
  317. pp := &Preprocessor{streamStmt: tt.stmt}
  318. dm := make(map[string]interface{})
  319. if e := json.Unmarshal(tt.data, &dm); e != nil {
  320. log.Fatal(e)
  321. return
  322. } else {
  323. tuple := &xsql.Tuple{Message: dm}
  324. result := pp.Apply(ctx, tuple)
  325. if !reflect.DeepEqual(tt.result, result) {
  326. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  327. }
  328. }
  329. }
  330. }
  331. func TestPreprocessorTime_Apply(t *testing.T) {
  332. var tests = []struct {
  333. stmt *xsql.StreamStmt
  334. data []byte
  335. result interface{}
  336. }{
  337. {
  338. stmt: &xsql.StreamStmt{
  339. Name: xsql.StreamName("demo"),
  340. StreamFields: []xsql.StreamField{
  341. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  342. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  343. },
  344. },
  345. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  346. result: &xsql.Tuple{Message: xsql.Message{
  347. "abc": common.TimeFromUnixMilli(1568854515000),
  348. "def": common.TimeFromUnixMilli(1568854573431),
  349. },
  350. },
  351. },
  352. {
  353. stmt: &xsql.StreamStmt{
  354. Name: xsql.StreamName("demo"),
  355. StreamFields: nil,
  356. },
  357. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  358. result: &xsql.Tuple{Message: xsql.Message{
  359. "abc": "2019-09-19T00:55:15.000Z",
  360. "def": float64(1568854573431),
  361. },
  362. },
  363. },
  364. {
  365. stmt: &xsql.StreamStmt{
  366. Name: xsql.StreamName("demo"),
  367. StreamFields: []xsql.StreamField{
  368. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  369. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  370. },
  371. },
  372. data: []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
  373. result: errors.New("error in preprocessor: invalid data type for abc, cannot convert to datetime: parsing time \"2019-09-19T00:55:1dd5Z\" as \"2006-01-02T15:04:05.000Z07:00\": cannot parse \"1dd5Z\" as \"05\""),
  374. },
  375. {
  376. stmt: &xsql.StreamStmt{
  377. Name: xsql.StreamName("demo"),
  378. StreamFields: []xsql.StreamField{
  379. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  380. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  381. },
  382. Options: map[string]string{
  383. "DATASOURCE": "users",
  384. "FORMAT": "AVRO",
  385. "KEY": "USERID",
  386. "CONF_KEY": "srv1",
  387. "TYPE": "MQTT",
  388. "TIMESTAMP": "USERID",
  389. "TIMESTAMP_FORMAT": "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
  390. },
  391. },
  392. data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
  393. result: &xsql.Tuple{Message: xsql.Message{
  394. "abc": common.TimeFromUnixMilli(1568894115000),
  395. "def": common.TimeFromUnixMilli(1568854573431),
  396. }},
  397. },
  398. //Array type
  399. {
  400. stmt: &xsql.StreamStmt{
  401. Name: xsql.StreamName("demo"),
  402. StreamFields: []xsql.StreamField{
  403. {Name: "a", FieldType: &xsql.ArrayType{
  404. Type: xsql.DATETIME,
  405. }},
  406. },
  407. },
  408. data: []byte(`{"a": [1568854515123, 1568854573431]}`),
  409. result: &xsql.Tuple{Message: xsql.Message{
  410. "a": []time.Time{
  411. common.TimeFromUnixMilli(1568854515123),
  412. common.TimeFromUnixMilli(1568854573431),
  413. },
  414. },
  415. },
  416. },
  417. {
  418. stmt: &xsql.StreamStmt{
  419. Name: xsql.StreamName("demo"),
  420. StreamFields: []xsql.StreamField{
  421. {Name: "a", FieldType: &xsql.RecType{
  422. StreamFields: []xsql.StreamField{
  423. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  424. {Name: "c", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  425. },
  426. }},
  427. },
  428. },
  429. data: []byte(`{"a": {"b" : "hello", "c": 1568854515000}}`),
  430. result: &xsql.Tuple{Message: xsql.Message{
  431. "a": map[string]interface{}{
  432. "b": "hello",
  433. "c": common.TimeFromUnixMilli(1568854515000),
  434. },
  435. },
  436. },
  437. },
  438. }
  439. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  440. defer common.CloseLogger()
  441. contextLogger := common.Log.WithField("rule", "TestPreprocessorTime_Apply")
  442. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  443. for i, tt := range tests {
  444. pp := &Preprocessor{streamStmt: tt.stmt}
  445. dm := make(map[string]interface{})
  446. if e := json.Unmarshal(tt.data, &dm); e != nil {
  447. log.Fatal(e)
  448. return
  449. } else {
  450. tuple := &xsql.Tuple{Message: dm}
  451. result := pp.Apply(ctx, tuple)
  452. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  453. if rt, ok := result.(*xsql.Tuple); ok {
  454. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  455. rt.Message["abc"] = rtt.UTC()
  456. }
  457. }
  458. if !reflect.DeepEqual(tt.result, result) {
  459. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  460. }
  461. }
  462. }
  463. }
  464. func TestPreprocessorEventtime_Apply(t *testing.T) {
  465. var tests = []struct {
  466. stmt *xsql.StreamStmt
  467. data []byte
  468. result interface{}
  469. }{
  470. //Basic type
  471. {
  472. stmt: &xsql.StreamStmt{
  473. Name: xsql.StreamName("demo"),
  474. StreamFields: []xsql.StreamField{
  475. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  476. },
  477. Options: map[string]string{
  478. "DATASOURCE": "users",
  479. "FORMAT": "AVRO",
  480. "KEY": "USERID",
  481. "CONF_KEY": "srv1",
  482. "TYPE": "MQTT",
  483. "TIMESTAMP": "abc",
  484. "TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
  485. },
  486. },
  487. data: []byte(`{"abc": 1568854515000}`),
  488. result: &xsql.Tuple{Message: xsql.Message{
  489. "abc": int(1568854515000),
  490. }, Timestamp: 1568854515000,
  491. },
  492. },
  493. {
  494. stmt: &xsql.StreamStmt{
  495. Name: xsql.StreamName("demo"),
  496. StreamFields: nil,
  497. Options: map[string]string{
  498. "DATASOURCE": "users",
  499. "FORMAT": "AVRO",
  500. "KEY": "USERID",
  501. "CONF_KEY": "srv1",
  502. "TYPE": "MQTT",
  503. "TIMESTAMP": "abc",
  504. "TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
  505. },
  506. },
  507. data: []byte(`{"abc": 1568854515000}`),
  508. result: &xsql.Tuple{Message: xsql.Message{
  509. "abc": float64(1568854515000),
  510. }, Timestamp: 1568854515000,
  511. },
  512. },
  513. {
  514. stmt: &xsql.StreamStmt{
  515. Name: xsql.StreamName("demo"),
  516. StreamFields: []xsql.StreamField{
  517. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  518. },
  519. Options: map[string]string{
  520. "DATASOURCE": "users",
  521. "TIMESTAMP": "abc",
  522. },
  523. },
  524. data: []byte(`{"abc": true}`),
  525. result: errors.New("cannot convert timestamp field abc to timestamp with error unsupported type to convert to timestamp true"),
  526. },
  527. {
  528. stmt: &xsql.StreamStmt{
  529. Name: xsql.StreamName("demo"),
  530. StreamFields: []xsql.StreamField{
  531. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  532. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  533. },
  534. Options: map[string]string{
  535. "DATASOURCE": "users",
  536. "TIMESTAMP": "def",
  537. },
  538. },
  539. data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
  540. result: &xsql.Tuple{Message: xsql.Message{
  541. "abc": float64(34),
  542. "def": "2019-09-23T02:47:29.754Z",
  543. }, Timestamp: int64(1569206849754),
  544. },
  545. },
  546. {
  547. stmt: &xsql.StreamStmt{
  548. Name: xsql.StreamName("demo"),
  549. StreamFields: []xsql.StreamField{
  550. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  551. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  552. },
  553. Options: map[string]string{
  554. "DATASOURCE": "users",
  555. "TIMESTAMP": "abc",
  556. },
  557. },
  558. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  559. result: &xsql.Tuple{Message: xsql.Message{
  560. "abc": common.TimeFromUnixMilli(1568854515000),
  561. "def": common.TimeFromUnixMilli(1568854573431),
  562. }, Timestamp: int64(1568854515000),
  563. },
  564. },
  565. {
  566. stmt: &xsql.StreamStmt{
  567. Name: xsql.StreamName("demo"),
  568. StreamFields: []xsql.StreamField{
  569. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  570. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  571. },
  572. Options: map[string]string{
  573. "DATASOURCE": "users",
  574. "TIMESTAMP": "def",
  575. "TIMESTAMP_FORMAT": "yyyy-MM-dd'AT'HH:mm:ss",
  576. },
  577. },
  578. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  579. result: &xsql.Tuple{Message: xsql.Message{
  580. "abc": float64(34),
  581. "def": "2019-09-23AT02:47:29",
  582. }, Timestamp: int64(1569206849000),
  583. },
  584. },
  585. {
  586. stmt: &xsql.StreamStmt{
  587. Name: xsql.StreamName("demo"),
  588. StreamFields: []xsql.StreamField{
  589. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  590. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  591. },
  592. Options: map[string]string{
  593. "DATASOURCE": "users",
  594. "TIMESTAMP": "def",
  595. "TIMESTAMP_FORMAT": "yyyy-MM-ddaHH:mm:ss",
  596. },
  597. },
  598. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  599. result: errors.New("cannot convert timestamp field def to timestamp with error parsing time \"2019-09-23AT02:47:29\" as \"2006-01-02PM15:04:05\": cannot parse \"02:47:29\" as \"PM\""),
  600. },
  601. }
  602. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  603. defer common.CloseLogger()
  604. contextLogger := common.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
  605. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  606. for i, tt := range tests {
  607. pp, err := NewPreprocessor(tt.stmt, nil, true)
  608. if err != nil {
  609. t.Error(err)
  610. }
  611. dm := make(map[string]interface{})
  612. if e := json.Unmarshal(tt.data, &dm); e != nil {
  613. log.Fatal(e)
  614. return
  615. } else {
  616. tuple := &xsql.Tuple{Message: dm}
  617. result := pp.Apply(ctx, tuple)
  618. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  619. if rt, ok := result.(*xsql.Tuple); ok {
  620. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  621. rt.Message["abc"] = rtt.UTC()
  622. }
  623. }
  624. if !reflect.DeepEqual(tt.result, result) {
  625. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  626. }
  627. }
  628. }
  629. }
  630. func TestPreprocessorError(t *testing.T) {
  631. tests := []struct {
  632. stmt *xsql.StreamStmt
  633. data []byte
  634. result interface{}
  635. }{
  636. {
  637. stmt: &xsql.StreamStmt{
  638. Name: xsql.StreamName("demo"),
  639. StreamFields: []xsql.StreamField{
  640. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  641. },
  642. },
  643. data: []byte(`{"abc": "dafsad"}`),
  644. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(dafsad)"),
  645. }, {
  646. stmt: &xsql.StreamStmt{
  647. Name: xsql.StreamName("demo"),
  648. StreamFields: []xsql.StreamField{
  649. {Name: "a", FieldType: &xsql.RecType{
  650. StreamFields: []xsql.StreamField{
  651. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  652. },
  653. }},
  654. },
  655. },
  656. data: []byte(`{"a": {"d" : "hello"}}`),
  657. result: errors.New("error in preprocessor: invalid data map[d:hello], field b not found"),
  658. }, {
  659. stmt: &xsql.StreamStmt{
  660. Name: xsql.StreamName("demo"),
  661. StreamFields: []xsql.StreamField{
  662. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  663. },
  664. Options: map[string]string{
  665. "DATASOURCE": "users",
  666. "FORMAT": "AVRO",
  667. "KEY": "USERID",
  668. "CONF_KEY": "srv1",
  669. "TYPE": "MQTT",
  670. "TIMESTAMP": "abc",
  671. "TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
  672. },
  673. },
  674. data: []byte(`{"abc": "not a time"}`),
  675. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(not a time)"),
  676. },
  677. }
  678. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  679. defer common.CloseLogger()
  680. contextLogger := common.Log.WithField("rule", "TestPreprocessorError")
  681. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  682. for i, tt := range tests {
  683. pp := &Preprocessor{streamStmt: tt.stmt}
  684. dm := make(map[string]interface{})
  685. if e := json.Unmarshal(tt.data, &dm); e != nil {
  686. log.Fatal(e)
  687. return
  688. } else {
  689. tuple := &xsql.Tuple{Message: dm}
  690. result := pp.Apply(ctx, tuple)
  691. if !reflect.DeepEqual(tt.result, result) {
  692. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  693. }
  694. }
  695. }
  696. }