preprocessor_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513
  1. package plans
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "log"
  9. "reflect"
  10. "testing"
  11. "time"
  12. )
  13. func TestPreprocessor_Apply(t *testing.T) {
  14. var tests = []struct {
  15. stmt *xsql.StreamStmt
  16. data []byte
  17. result interface{}
  18. }{
  19. //Basic type
  20. {
  21. stmt: &xsql.StreamStmt{
  22. Name: xsql.StreamName("demo"),
  23. StreamFields: []xsql.StreamField{
  24. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  25. },
  26. },
  27. data: []byte(`{"a": 6}`),
  28. result: nil,
  29. },
  30. {
  31. stmt: &xsql.StreamStmt{
  32. Name: xsql.StreamName("demo"),
  33. StreamFields: []xsql.StreamField{
  34. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  35. },
  36. },
  37. data: []byte(`{"abc": 6}`),
  38. result: &xsql.Tuple{Message: xsql.Message{
  39. "abc": int(6),
  40. },
  41. },
  42. },
  43. {
  44. stmt: &xsql.StreamStmt{
  45. Name: xsql.StreamName("demo"),
  46. StreamFields: []xsql.StreamField{
  47. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  48. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  49. },
  50. },
  51. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  52. result: &xsql.Tuple{Message: xsql.Message{
  53. "abc": float64(34),
  54. "def": "hello",
  55. },
  56. },
  57. },
  58. {
  59. stmt: &xsql.StreamStmt{
  60. Name: xsql.StreamName("demo"),
  61. StreamFields: []xsql.StreamField{
  62. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  63. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  64. },
  65. },
  66. data: []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
  67. result: &xsql.Tuple{Message: xsql.Message{
  68. "abc": float64(34),
  69. "def": "hello",
  70. },
  71. },
  72. },
  73. {
  74. stmt: &xsql.StreamStmt{
  75. Name: xsql.StreamName("demo"),
  76. StreamFields: []xsql.StreamField{
  77. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  78. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  79. },
  80. },
  81. data: []byte(`{"abc": 77, "def" : "hello"}`),
  82. result: nil,
  83. },
  84. {
  85. stmt: &xsql.StreamStmt{
  86. Name: xsql.StreamName("demo"),
  87. StreamFields: []xsql.StreamField{
  88. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  89. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  90. },
  91. },
  92. data: []byte(`{"a": {"b" : "hello"}}`),
  93. result: nil,
  94. },
  95. //Rec type
  96. {
  97. stmt: &xsql.StreamStmt{
  98. Name: xsql.StreamName("demo"),
  99. StreamFields: []xsql.StreamField{
  100. {Name: "a", FieldType: &xsql.RecType{
  101. StreamFields: []xsql.StreamField{
  102. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  103. },
  104. }},
  105. },
  106. },
  107. data: []byte(`{"a": {"b" : "hello"}}`),
  108. result: &xsql.Tuple{Message: xsql.Message{
  109. "a": map[string]interface{}{
  110. "b": "hello",
  111. },
  112. },
  113. },
  114. },
  115. //Rec type
  116. {
  117. stmt: &xsql.StreamStmt{
  118. Name: xsql.StreamName("demo"),
  119. StreamFields: []xsql.StreamField{
  120. {Name: "a", FieldType: &xsql.RecType{
  121. StreamFields: []xsql.StreamField{
  122. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  123. },
  124. }},
  125. },
  126. },
  127. data: []byte(`{"a": "{\"b\" : \"32\"}"}`),
  128. result: &xsql.Tuple{Message: xsql.Message{
  129. "a": map[string]interface{}{
  130. "b": float64(32),
  131. },
  132. },
  133. },
  134. },
  135. //Array of complex type
  136. {
  137. stmt: &xsql.StreamStmt{
  138. Name: xsql.StreamName("demo"),
  139. StreamFields: []xsql.StreamField{
  140. {Name: "a", FieldType: &xsql.ArrayType{
  141. Type: xsql.STRUCT,
  142. FieldType: &xsql.RecType{
  143. StreamFields: []xsql.StreamField{
  144. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  145. },
  146. },
  147. }},
  148. },
  149. },
  150. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  151. result: &xsql.Tuple{Message: xsql.Message{
  152. "a": []map[string]interface{}{
  153. {"b": "hello1"},
  154. {"b": "hello2"},
  155. },
  156. },
  157. },
  158. },
  159. {
  160. stmt: &xsql.StreamStmt{
  161. Name: xsql.StreamName("demo"),
  162. StreamFields: []xsql.StreamField{
  163. {Name: "a", FieldType: &xsql.ArrayType{
  164. Type: xsql.FLOAT,
  165. }},
  166. },
  167. },
  168. data: []byte(`{"a": "[\"55\", \"77\"]"}`),
  169. result: &xsql.Tuple{Message: xsql.Message{
  170. "a": []float64{
  171. 55,
  172. 77,
  173. },
  174. },
  175. },
  176. },
  177. //Rec of complex type
  178. {
  179. stmt: &xsql.StreamStmt{
  180. Name: xsql.StreamName("demo"),
  181. StreamFields: []xsql.StreamField{
  182. {Name: "a", FieldType: &xsql.RecType{
  183. StreamFields: []xsql.StreamField{
  184. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  185. {Name: "c", FieldType: &xsql.RecType{
  186. StreamFields: []xsql.StreamField{
  187. {Name: "d", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  188. },
  189. }},
  190. },
  191. }},
  192. },
  193. },
  194. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  195. result: &xsql.Tuple{Message: xsql.Message{
  196. "a": map[string]interface{}{
  197. "b": "hello",
  198. "c": map[string]interface{}{
  199. "d": int(35),
  200. },
  201. },
  202. },
  203. },
  204. },
  205. }
  206. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  207. defer common.CloseLogger()
  208. contextLogger := common.Log.WithField("rule", "TestPreprocessor_Apply")
  209. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  210. for i, tt := range tests {
  211. pp := &Preprocessor{streamStmt: tt.stmt}
  212. dm := make(map[string]interface{})
  213. if e := json.Unmarshal(tt.data, &dm); e != nil {
  214. log.Fatal(e)
  215. return
  216. } else {
  217. tuple := &xsql.Tuple{Message:dm}
  218. result := pp.Apply(ctx, tuple)
  219. if !reflect.DeepEqual(tt.result, result) {
  220. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  221. }
  222. }
  223. }
  224. }
  225. func TestPreprocessorTime_Apply(t *testing.T){
  226. var tests = []struct {
  227. stmt *xsql.StreamStmt
  228. data []byte
  229. result interface{}
  230. }{
  231. {
  232. stmt: &xsql.StreamStmt{
  233. Name: xsql.StreamName("demo"),
  234. StreamFields: []xsql.StreamField{
  235. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  236. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  237. },
  238. },
  239. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  240. result: &xsql.Tuple{Message: xsql.Message{
  241. "abc": common.TimeFromUnixMilli(1568854515000),
  242. "def": common.TimeFromUnixMilli(1568854573431),
  243. },
  244. },
  245. },
  246. {
  247. stmt: &xsql.StreamStmt{
  248. Name: xsql.StreamName("demo"),
  249. StreamFields: []xsql.StreamField{
  250. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  251. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  252. },
  253. },
  254. data: []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
  255. result: nil,
  256. },
  257. {
  258. stmt: &xsql.StreamStmt{
  259. Name: xsql.StreamName("demo"),
  260. StreamFields: []xsql.StreamField{
  261. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  262. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  263. },
  264. Options: map[string]string{
  265. "DATASOURCE" : "users",
  266. "FORMAT" : "AVRO",
  267. "KEY" : "USERID",
  268. "CONF_KEY" : "srv1",
  269. "TYPE" : "MQTT",
  270. "TIMESTAMP" : "USERID",
  271. "TIMESTAMP_FORMAT" : "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
  272. },
  273. },
  274. data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
  275. result: &xsql.Tuple{Message: xsql.Message{
  276. "abc": common.TimeFromUnixMilli(1568894115000),
  277. "def": common.TimeFromUnixMilli(1568854573431),
  278. }},
  279. },
  280. //Array type
  281. {
  282. stmt: &xsql.StreamStmt{
  283. Name: xsql.StreamName("demo"),
  284. StreamFields: []xsql.StreamField{
  285. {Name: "a", FieldType: &xsql.ArrayType{
  286. Type: xsql.DATETIME,
  287. }},
  288. },
  289. },
  290. data: []byte(`{"a": [1568854515123, 1568854573431]}`),
  291. result: &xsql.Tuple{Message: xsql.Message{
  292. "a": []time.Time{
  293. common.TimeFromUnixMilli(1568854515123),
  294. common.TimeFromUnixMilli(1568854573431),
  295. },
  296. },
  297. },
  298. },
  299. {
  300. stmt: &xsql.StreamStmt{
  301. Name: xsql.StreamName("demo"),
  302. StreamFields: []xsql.StreamField{
  303. {Name: "a", FieldType: &xsql.RecType{
  304. StreamFields: []xsql.StreamField{
  305. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  306. {Name: "c", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  307. },
  308. }},
  309. },
  310. },
  311. data: []byte(`{"a": {"b" : "hello", "c": 1568854515000}}`),
  312. result: &xsql.Tuple{Message: xsql.Message{
  313. "a": map[string]interface{}{
  314. "b": "hello",
  315. "c": common.TimeFromUnixMilli(1568854515000),
  316. },
  317. },
  318. },
  319. },
  320. }
  321. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  322. defer common.CloseLogger()
  323. contextLogger := common.Log.WithField("rule", "TestPreprocessorTime_Apply")
  324. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  325. for i, tt := range tests {
  326. pp := &Preprocessor{streamStmt: tt.stmt}
  327. dm := make(map[string]interface{})
  328. if e := json.Unmarshal(tt.data, &dm); e != nil {
  329. log.Fatal(e)
  330. return
  331. } else {
  332. tuple := &xsql.Tuple{Message:dm}
  333. result := pp.Apply(ctx, tuple)
  334. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  335. if rt, ok := result.(*xsql.Tuple); ok{
  336. if rtt, ok := rt.Message["abc"].(time.Time); ok{
  337. rt.Message["abc"] = rtt.UTC()
  338. }
  339. }
  340. if !reflect.DeepEqual(tt.result, result) {
  341. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  342. }
  343. }
  344. }
  345. }
  346. func TestPreprocessorEventtime_Apply(t *testing.T) {
  347. var tests = []struct {
  348. stmt *xsql.StreamStmt
  349. data []byte
  350. result interface{}
  351. }{
  352. //Basic type
  353. {
  354. stmt: &xsql.StreamStmt{
  355. Name: xsql.StreamName("demo"),
  356. StreamFields: []xsql.StreamField{
  357. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  358. },
  359. Options: map[string]string{
  360. "DATASOURCE" : "users",
  361. "FORMAT" : "AVRO",
  362. "KEY" : "USERID",
  363. "CONF_KEY" : "srv1",
  364. "TYPE" : "MQTT",
  365. "TIMESTAMP" : "abc",
  366. "TIMESTAMP_FORMAT" : "yyyy-MM-dd''T''HH:mm:ssX'",
  367. },
  368. },
  369. data: []byte(`{"abc": 1568854515000}`),
  370. result: &xsql.Tuple{Message: xsql.Message{
  371. "abc": int(1568854515000),
  372. }, Timestamp: 1568854515000,
  373. },
  374. },
  375. {
  376. stmt: &xsql.StreamStmt{
  377. Name: xsql.StreamName("demo"),
  378. StreamFields: []xsql.StreamField{
  379. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  380. },
  381. Options: map[string]string{
  382. "DATASOURCE" : "users",
  383. "TIMESTAMP" : "abc",
  384. },
  385. },
  386. data: []byte(`{"abc": true}`),
  387. result: nil,
  388. },
  389. {
  390. stmt: &xsql.StreamStmt{
  391. Name: xsql.StreamName("demo"),
  392. StreamFields: []xsql.StreamField{
  393. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  394. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  395. },
  396. Options: map[string]string{
  397. "DATASOURCE" : "users",
  398. "TIMESTAMP" : "def",
  399. },
  400. },
  401. data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
  402. result: &xsql.Tuple{Message: xsql.Message{
  403. "abc": float64(34),
  404. "def": "2019-09-23T02:47:29.754Z",
  405. }, Timestamp: int64(1569206849754),
  406. },
  407. },
  408. {
  409. stmt: &xsql.StreamStmt{
  410. Name: xsql.StreamName("demo"),
  411. StreamFields: []xsql.StreamField{
  412. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  413. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  414. },
  415. Options: map[string]string{
  416. "DATASOURCE" : "users",
  417. "TIMESTAMP" : "abc",
  418. },
  419. },
  420. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  421. result: &xsql.Tuple{Message: xsql.Message{
  422. "abc": common.TimeFromUnixMilli(1568854515000),
  423. "def": common.TimeFromUnixMilli(1568854573431),
  424. }, Timestamp: int64(1568854515000),
  425. },
  426. },
  427. {
  428. stmt: &xsql.StreamStmt{
  429. Name: xsql.StreamName("demo"),
  430. StreamFields: []xsql.StreamField{
  431. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  432. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  433. },
  434. Options: map[string]string{
  435. "DATASOURCE" : "users",
  436. "TIMESTAMP" : "def",
  437. "TIMESTAMP_FORMAT" : "yyyy-MM-dd'AT'HH:mm:ss",
  438. },
  439. },
  440. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  441. result: &xsql.Tuple{Message: xsql.Message{
  442. "abc": float64(34),
  443. "def": "2019-09-23AT02:47:29",
  444. }, Timestamp: int64(1569206849000),
  445. },
  446. },
  447. {
  448. stmt: &xsql.StreamStmt{
  449. Name: xsql.StreamName("demo"),
  450. StreamFields: []xsql.StreamField{
  451. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  452. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  453. },
  454. Options: map[string]string{
  455. "DATASOURCE" : "users",
  456. "TIMESTAMP" : "def",
  457. "TIMESTAMP_FORMAT" : "yyyy-MM-ddaHH:mm:ss",
  458. },
  459. },
  460. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  461. result: nil,
  462. },
  463. }
  464. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  465. defer common.CloseLogger()
  466. contextLogger := common.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
  467. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  468. for i, tt := range tests {
  469. pp, err := NewPreprocessor(tt.stmt, nil,true)
  470. if err != nil{
  471. t.Error(err)
  472. }
  473. dm := make(map[string]interface{})
  474. if e := json.Unmarshal(tt.data, &dm); e != nil {
  475. log.Fatal(e)
  476. return
  477. } else {
  478. tuple := &xsql.Tuple{Message:dm}
  479. result := pp.Apply(ctx, tuple)
  480. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  481. if rt, ok := result.(*xsql.Tuple); ok{
  482. if rtt, ok := rt.Message["abc"].(time.Time); ok{
  483. rt.Message["abc"] = rtt.UTC()
  484. }
  485. }
  486. if !reflect.DeepEqual(tt.result, result) {
  487. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  488. }
  489. }
  490. }
  491. }