preprocessor_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649
  1. package plans
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "log"
  9. "reflect"
  10. "testing"
  11. "time"
  12. )
  13. func TestPreprocessor_Apply(t *testing.T) {
  14. var tests = []struct {
  15. stmt *xsql.StreamStmt
  16. data []byte
  17. result interface{}
  18. }{
  19. //Basic type
  20. {
  21. stmt: &xsql.StreamStmt{
  22. Name: xsql.StreamName("demo"),
  23. StreamFields: []xsql.StreamField{
  24. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  25. },
  26. },
  27. data: []byte(`{"a": 6}`),
  28. result: nil,
  29. },
  30. {
  31. stmt: &xsql.StreamStmt{
  32. Name: xsql.StreamName("demo"),
  33. StreamFields: nil,
  34. },
  35. data: []byte(`{"a": 6}`),
  36. result: &xsql.Tuple{Message: xsql.Message{
  37. "a": float64(6),
  38. },
  39. },
  40. },
  41. {
  42. stmt: &xsql.StreamStmt{
  43. Name: xsql.StreamName("demo"),
  44. StreamFields: []xsql.StreamField{
  45. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  46. },
  47. },
  48. data: []byte(`{"abc": 6}`),
  49. result: &xsql.Tuple{Message: xsql.Message{
  50. "abc": 6,
  51. },
  52. },
  53. },
  54. {
  55. stmt: &xsql.StreamStmt{
  56. Name: xsql.StreamName("demo"),
  57. StreamFields: nil,
  58. },
  59. data: []byte(`{"abc": 6}`),
  60. result: &xsql.Tuple{Message: xsql.Message{
  61. "abc": float64(6),
  62. },
  63. },
  64. },
  65. {
  66. stmt: &xsql.StreamStmt{
  67. Name: xsql.StreamName("demo"),
  68. StreamFields: []xsql.StreamField{
  69. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  70. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  71. },
  72. },
  73. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  74. result: &xsql.Tuple{Message: xsql.Message{
  75. "abc": float64(34),
  76. "def": "hello",
  77. },
  78. },
  79. },
  80. {
  81. stmt: &xsql.StreamStmt{
  82. Name: xsql.StreamName("demo"),
  83. StreamFields: nil,
  84. },
  85. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  86. result: &xsql.Tuple{Message: xsql.Message{
  87. "abc": float64(34),
  88. "def": "hello",
  89. "ghi": float64(50),
  90. },
  91. },
  92. },
  93. {
  94. stmt: &xsql.StreamStmt{
  95. Name: xsql.StreamName("demo"),
  96. StreamFields: []xsql.StreamField{
  97. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  98. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  99. },
  100. },
  101. data: []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
  102. result: &xsql.Tuple{Message: xsql.Message{
  103. "abc": float64(34),
  104. "def": "hello",
  105. },
  106. },
  107. },
  108. {
  109. stmt: &xsql.StreamStmt{
  110. Name: xsql.StreamName("demo"),
  111. StreamFields: []xsql.StreamField{
  112. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  113. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  114. },
  115. },
  116. data: []byte(`{"abc": 77, "def" : "hello"}`),
  117. result: nil,
  118. },
  119. {
  120. stmt: &xsql.StreamStmt{
  121. Name: xsql.StreamName("demo"),
  122. StreamFields: []xsql.StreamField{
  123. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  124. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  125. },
  126. },
  127. data: []byte(`{"a": {"b" : "hello"}}`),
  128. result: nil,
  129. },
  130. {
  131. stmt: &xsql.StreamStmt{
  132. Name: xsql.StreamName("demo"),
  133. StreamFields: nil,
  134. },
  135. data: []byte(`{"a": {"b" : "hello"}}`),
  136. result: &xsql.Tuple{Message: xsql.Message{
  137. "a": map[string]interface{}{
  138. "b": "hello",
  139. },
  140. },
  141. },
  142. },
  143. //Rec type
  144. {
  145. stmt: &xsql.StreamStmt{
  146. Name: xsql.StreamName("demo"),
  147. StreamFields: []xsql.StreamField{
  148. {Name: "a", FieldType: &xsql.RecType{
  149. StreamFields: []xsql.StreamField{
  150. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  151. },
  152. }},
  153. },
  154. },
  155. data: []byte(`{"a": {"b" : "hello"}}`),
  156. result: &xsql.Tuple{Message: xsql.Message{
  157. "a": map[string]interface{}{
  158. "b": "hello",
  159. },
  160. },
  161. },
  162. },
  163. //Rec type
  164. {
  165. stmt: &xsql.StreamStmt{
  166. Name: xsql.StreamName("demo"),
  167. StreamFields: []xsql.StreamField{
  168. {Name: "a", FieldType: &xsql.RecType{
  169. StreamFields: []xsql.StreamField{
  170. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  171. },
  172. }},
  173. },
  174. },
  175. data: []byte(`{"a": "{\"b\" : \"32\"}"}`),
  176. result: &xsql.Tuple{Message: xsql.Message{
  177. "a": map[string]interface{}{
  178. "b": float64(32),
  179. },
  180. },
  181. },
  182. },
  183. {
  184. stmt: &xsql.StreamStmt{
  185. Name: xsql.StreamName("demo"),
  186. StreamFields: nil,
  187. },
  188. data: []byte(`{"a": {"b" : "32"}}`),
  189. result: &xsql.Tuple{Message: xsql.Message{
  190. "a": map[string]interface{}{
  191. "b": "32",
  192. },
  193. },
  194. },
  195. },
  196. //Array of complex type
  197. {
  198. stmt: &xsql.StreamStmt{
  199. Name: xsql.StreamName("demo"),
  200. StreamFields: []xsql.StreamField{
  201. {Name: "a", FieldType: &xsql.ArrayType{
  202. Type: xsql.STRUCT,
  203. FieldType: &xsql.RecType{
  204. StreamFields: []xsql.StreamField{
  205. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  206. },
  207. },
  208. }},
  209. },
  210. },
  211. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  212. result: &xsql.Tuple{Message: xsql.Message{
  213. "a": []map[string]interface{}{
  214. {"b": "hello1"},
  215. {"b": "hello2"},
  216. },
  217. },
  218. },
  219. },
  220. {
  221. stmt: &xsql.StreamStmt{
  222. Name: xsql.StreamName("demo"),
  223. StreamFields: nil,
  224. },
  225. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  226. result: &xsql.Tuple{Message: xsql.Message{
  227. "a": []interface{}{
  228. map[string]interface{}{"b": "hello1"},
  229. map[string]interface{}{"b": "hello2"},
  230. },
  231. },
  232. },
  233. },
  234. {
  235. stmt: &xsql.StreamStmt{
  236. Name: xsql.StreamName("demo"),
  237. StreamFields: []xsql.StreamField{
  238. {Name: "a", FieldType: &xsql.ArrayType{
  239. Type: xsql.FLOAT,
  240. }},
  241. },
  242. },
  243. data: []byte(`{"a": "[\"55\", \"77\"]"}`),
  244. result: &xsql.Tuple{Message: xsql.Message{
  245. "a": []float64{
  246. 55,
  247. 77,
  248. },
  249. },
  250. },
  251. },
  252. {
  253. stmt: &xsql.StreamStmt{
  254. Name: xsql.StreamName("demo"),
  255. StreamFields: nil,
  256. },
  257. data: []byte(`{"a": [55, 77]}`),
  258. result: &xsql.Tuple{Message: xsql.Message{
  259. "a": []interface{}{
  260. float64(55),
  261. float64(77),
  262. },
  263. },
  264. },
  265. },
  266. //Rec of complex type
  267. {
  268. stmt: &xsql.StreamStmt{
  269. Name: xsql.StreamName("demo"),
  270. StreamFields: []xsql.StreamField{
  271. {Name: "a", FieldType: &xsql.RecType{
  272. StreamFields: []xsql.StreamField{
  273. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  274. {Name: "c", FieldType: &xsql.RecType{
  275. StreamFields: []xsql.StreamField{
  276. {Name: "d", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  277. },
  278. }},
  279. },
  280. }},
  281. },
  282. },
  283. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  284. result: &xsql.Tuple{Message: xsql.Message{
  285. "a": map[string]interface{}{
  286. "b": "hello",
  287. "c": map[string]interface{}{
  288. "d": int(35),
  289. },
  290. },
  291. },
  292. },
  293. },
  294. {
  295. stmt: &xsql.StreamStmt{
  296. Name: xsql.StreamName("demo"),
  297. StreamFields: nil,
  298. },
  299. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  300. result: &xsql.Tuple{Message: xsql.Message{
  301. "a": map[string]interface{}{
  302. "b": "hello",
  303. "c": map[string]interface{}{
  304. "d": 35.2,
  305. },
  306. },
  307. },
  308. },
  309. },
  310. }
  311. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  312. defer common.CloseLogger()
  313. contextLogger := common.Log.WithField("rule", "TestPreprocessor_Apply")
  314. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  315. for i, tt := range tests {
  316. pp := &Preprocessor{streamStmt: tt.stmt}
  317. dm := make(map[string]interface{})
  318. if e := json.Unmarshal(tt.data, &dm); e != nil {
  319. log.Fatal(e)
  320. return
  321. } else {
  322. tuple := &xsql.Tuple{Message: dm}
  323. result := pp.Apply(ctx, tuple)
  324. if !reflect.DeepEqual(tt.result, result) {
  325. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  326. }
  327. }
  328. }
  329. }
  330. func TestPreprocessorTime_Apply(t *testing.T) {
  331. var tests = []struct {
  332. stmt *xsql.StreamStmt
  333. data []byte
  334. result interface{}
  335. }{
  336. {
  337. stmt: &xsql.StreamStmt{
  338. Name: xsql.StreamName("demo"),
  339. StreamFields: []xsql.StreamField{
  340. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  341. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  342. },
  343. },
  344. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  345. result: &xsql.Tuple{Message: xsql.Message{
  346. "abc": common.TimeFromUnixMilli(1568854515000),
  347. "def": common.TimeFromUnixMilli(1568854573431),
  348. },
  349. },
  350. },
  351. {
  352. stmt: &xsql.StreamStmt{
  353. Name: xsql.StreamName("demo"),
  354. StreamFields: nil,
  355. },
  356. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  357. result: &xsql.Tuple{Message: xsql.Message{
  358. "abc": "2019-09-19T00:55:15.000Z",
  359. "def": float64(1568854573431),
  360. },
  361. },
  362. },
  363. {
  364. stmt: &xsql.StreamStmt{
  365. Name: xsql.StreamName("demo"),
  366. StreamFields: []xsql.StreamField{
  367. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  368. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  369. },
  370. },
  371. data: []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
  372. result: nil,
  373. },
  374. {
  375. stmt: &xsql.StreamStmt{
  376. Name: xsql.StreamName("demo"),
  377. StreamFields: []xsql.StreamField{
  378. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  379. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  380. },
  381. Options: map[string]string{
  382. "DATASOURCE": "users",
  383. "FORMAT": "AVRO",
  384. "KEY": "USERID",
  385. "CONF_KEY": "srv1",
  386. "TYPE": "MQTT",
  387. "TIMESTAMP": "USERID",
  388. "TIMESTAMP_FORMAT": "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
  389. },
  390. },
  391. data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
  392. result: &xsql.Tuple{Message: xsql.Message{
  393. "abc": common.TimeFromUnixMilli(1568894115000),
  394. "def": common.TimeFromUnixMilli(1568854573431),
  395. }},
  396. },
  397. //Array type
  398. {
  399. stmt: &xsql.StreamStmt{
  400. Name: xsql.StreamName("demo"),
  401. StreamFields: []xsql.StreamField{
  402. {Name: "a", FieldType: &xsql.ArrayType{
  403. Type: xsql.DATETIME,
  404. }},
  405. },
  406. },
  407. data: []byte(`{"a": [1568854515123, 1568854573431]}`),
  408. result: &xsql.Tuple{Message: xsql.Message{
  409. "a": []time.Time{
  410. common.TimeFromUnixMilli(1568854515123),
  411. common.TimeFromUnixMilli(1568854573431),
  412. },
  413. },
  414. },
  415. },
  416. {
  417. stmt: &xsql.StreamStmt{
  418. Name: xsql.StreamName("demo"),
  419. StreamFields: []xsql.StreamField{
  420. {Name: "a", FieldType: &xsql.RecType{
  421. StreamFields: []xsql.StreamField{
  422. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  423. {Name: "c", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  424. },
  425. }},
  426. },
  427. },
  428. data: []byte(`{"a": {"b" : "hello", "c": 1568854515000}}`),
  429. result: &xsql.Tuple{Message: xsql.Message{
  430. "a": map[string]interface{}{
  431. "b": "hello",
  432. "c": common.TimeFromUnixMilli(1568854515000),
  433. },
  434. },
  435. },
  436. },
  437. }
  438. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  439. defer common.CloseLogger()
  440. contextLogger := common.Log.WithField("rule", "TestPreprocessorTime_Apply")
  441. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  442. for i, tt := range tests {
  443. pp := &Preprocessor{streamStmt: tt.stmt}
  444. dm := make(map[string]interface{})
  445. if e := json.Unmarshal(tt.data, &dm); e != nil {
  446. log.Fatal(e)
  447. return
  448. } else {
  449. tuple := &xsql.Tuple{Message: dm}
  450. result := pp.Apply(ctx, tuple)
  451. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  452. if rt, ok := result.(*xsql.Tuple); ok {
  453. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  454. rt.Message["abc"] = rtt.UTC()
  455. }
  456. }
  457. if !reflect.DeepEqual(tt.result, result) {
  458. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  459. }
  460. }
  461. }
  462. }
  463. func TestPreprocessorEventtime_Apply(t *testing.T) {
  464. var tests = []struct {
  465. stmt *xsql.StreamStmt
  466. data []byte
  467. result interface{}
  468. }{
  469. //Basic type
  470. {
  471. stmt: &xsql.StreamStmt{
  472. Name: xsql.StreamName("demo"),
  473. StreamFields: []xsql.StreamField{
  474. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  475. },
  476. Options: map[string]string{
  477. "DATASOURCE": "users",
  478. "FORMAT": "AVRO",
  479. "KEY": "USERID",
  480. "CONF_KEY": "srv1",
  481. "TYPE": "MQTT",
  482. "TIMESTAMP": "abc",
  483. "TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
  484. },
  485. },
  486. data: []byte(`{"abc": 1568854515000}`),
  487. result: &xsql.Tuple{Message: xsql.Message{
  488. "abc": int(1568854515000),
  489. }, Timestamp: 1568854515000,
  490. },
  491. },
  492. {
  493. stmt: &xsql.StreamStmt{
  494. Name: xsql.StreamName("demo"),
  495. StreamFields: nil,
  496. Options: map[string]string{
  497. "DATASOURCE": "users",
  498. "FORMAT": "AVRO",
  499. "KEY": "USERID",
  500. "CONF_KEY": "srv1",
  501. "TYPE": "MQTT",
  502. "TIMESTAMP": "abc",
  503. "TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
  504. },
  505. },
  506. data: []byte(`{"abc": 1568854515000}`),
  507. result: &xsql.Tuple{Message: xsql.Message{
  508. "abc": float64(1568854515000),
  509. }, Timestamp: 1568854515000,
  510. },
  511. },
  512. {
  513. stmt: &xsql.StreamStmt{
  514. Name: xsql.StreamName("demo"),
  515. StreamFields: []xsql.StreamField{
  516. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  517. },
  518. Options: map[string]string{
  519. "DATASOURCE": "users",
  520. "TIMESTAMP": "abc",
  521. },
  522. },
  523. data: []byte(`{"abc": true}`),
  524. result: nil,
  525. },
  526. {
  527. stmt: &xsql.StreamStmt{
  528. Name: xsql.StreamName("demo"),
  529. StreamFields: []xsql.StreamField{
  530. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  531. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  532. },
  533. Options: map[string]string{
  534. "DATASOURCE": "users",
  535. "TIMESTAMP": "def",
  536. },
  537. },
  538. data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
  539. result: &xsql.Tuple{Message: xsql.Message{
  540. "abc": float64(34),
  541. "def": "2019-09-23T02:47:29.754Z",
  542. }, Timestamp: int64(1569206849754),
  543. },
  544. },
  545. {
  546. stmt: &xsql.StreamStmt{
  547. Name: xsql.StreamName("demo"),
  548. StreamFields: []xsql.StreamField{
  549. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  550. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  551. },
  552. Options: map[string]string{
  553. "DATASOURCE": "users",
  554. "TIMESTAMP": "abc",
  555. },
  556. },
  557. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  558. result: &xsql.Tuple{Message: xsql.Message{
  559. "abc": common.TimeFromUnixMilli(1568854515000),
  560. "def": common.TimeFromUnixMilli(1568854573431),
  561. }, Timestamp: int64(1568854515000),
  562. },
  563. },
  564. {
  565. stmt: &xsql.StreamStmt{
  566. Name: xsql.StreamName("demo"),
  567. StreamFields: []xsql.StreamField{
  568. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  569. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  570. },
  571. Options: map[string]string{
  572. "DATASOURCE": "users",
  573. "TIMESTAMP": "def",
  574. "TIMESTAMP_FORMAT": "yyyy-MM-dd'AT'HH:mm:ss",
  575. },
  576. },
  577. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  578. result: &xsql.Tuple{Message: xsql.Message{
  579. "abc": float64(34),
  580. "def": "2019-09-23AT02:47:29",
  581. }, Timestamp: int64(1569206849000),
  582. },
  583. },
  584. {
  585. stmt: &xsql.StreamStmt{
  586. Name: xsql.StreamName("demo"),
  587. StreamFields: []xsql.StreamField{
  588. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  589. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  590. },
  591. Options: map[string]string{
  592. "DATASOURCE": "users",
  593. "TIMESTAMP": "def",
  594. "TIMESTAMP_FORMAT": "yyyy-MM-ddaHH:mm:ss",
  595. },
  596. },
  597. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  598. result: nil,
  599. },
  600. }
  601. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  602. defer common.CloseLogger()
  603. contextLogger := common.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
  604. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  605. for i, tt := range tests {
  606. pp, err := NewPreprocessor(tt.stmt, nil, true)
  607. if err != nil {
  608. t.Error(err)
  609. }
  610. dm := make(map[string]interface{})
  611. if e := json.Unmarshal(tt.data, &dm); e != nil {
  612. log.Fatal(e)
  613. return
  614. } else {
  615. tuple := &xsql.Tuple{Message: dm}
  616. result := pp.Apply(ctx, tuple)
  617. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  618. if rt, ok := result.(*xsql.Tuple); ok {
  619. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  620. rt.Message["abc"] = rtt.UTC()
  621. }
  622. }
  623. if !reflect.DeepEqual(tt.result, result) {
  624. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  625. }
  626. }
  627. }
  628. }