preprocessor_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. package plans
  2. import (
  3. "encoding/json"
  4. "engine/common"
  5. "engine/xsql"
  6. "fmt"
  7. "log"
  8. "reflect"
  9. "testing"
  10. "time"
  11. )
  12. func TestPreprocessor_Apply(t *testing.T) {
  13. var tests = []struct {
  14. stmt *xsql.StreamStmt
  15. data []byte
  16. result interface{}
  17. }{
  18. //Basic type
  19. {
  20. stmt: &xsql.StreamStmt{
  21. Name: xsql.StreamName("demo"),
  22. StreamFields: []xsql.StreamField{
  23. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  24. },
  25. },
  26. data: []byte(`{"a": 6}`),
  27. result: nil,
  28. },
  29. {
  30. stmt: &xsql.StreamStmt{
  31. Name: xsql.StreamName("demo"),
  32. StreamFields: []xsql.StreamField{
  33. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  34. },
  35. },
  36. data: []byte(`{"abc": 6}`),
  37. result: &xsql.Tuple{Message: xsql.Message{
  38. "abc": int(6),
  39. },
  40. },
  41. },
  42. {
  43. stmt: &xsql.StreamStmt{
  44. Name: xsql.StreamName("demo"),
  45. StreamFields: []xsql.StreamField{
  46. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  47. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  48. },
  49. },
  50. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  51. result: &xsql.Tuple{Message: xsql.Message{
  52. "abc": float64(34),
  53. "def": "hello",
  54. },
  55. },
  56. },
  57. {
  58. stmt: &xsql.StreamStmt{
  59. Name: xsql.StreamName("demo"),
  60. StreamFields: []xsql.StreamField{
  61. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  62. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  63. },
  64. },
  65. data: []byte(`{"abc": 77, "def" : "hello"}`),
  66. result: nil,
  67. },
  68. {
  69. stmt: &xsql.StreamStmt{
  70. Name: xsql.StreamName("demo"),
  71. StreamFields: []xsql.StreamField{
  72. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  73. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  74. },
  75. },
  76. data: []byte(`{"a": {"b" : "hello"}}`),
  77. result: nil,
  78. },
  79. //Rec type
  80. {
  81. stmt: &xsql.StreamStmt{
  82. Name: xsql.StreamName("demo"),
  83. StreamFields: []xsql.StreamField{
  84. {Name: "a", FieldType: &xsql.RecType{
  85. StreamFields: []xsql.StreamField{
  86. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  87. },
  88. }},
  89. },
  90. },
  91. data: []byte(`{"a": {"b" : "hello"}}`),
  92. result: &xsql.Tuple{Message: xsql.Message{
  93. "a": map[string]interface{}{
  94. "b": "hello",
  95. },
  96. },
  97. },
  98. },
  99. //Array of complex type
  100. {
  101. stmt: &xsql.StreamStmt{
  102. Name: xsql.StreamName("demo"),
  103. StreamFields: []xsql.StreamField{
  104. {Name: "a", FieldType: &xsql.ArrayType{
  105. Type: xsql.STRUCT,
  106. FieldType: &xsql.RecType{
  107. StreamFields: []xsql.StreamField{
  108. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  109. },
  110. },
  111. }},
  112. },
  113. },
  114. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  115. result: &xsql.Tuple{Message: xsql.Message{
  116. "a": []map[string]interface{}{
  117. {"b": "hello1"},
  118. {"b": "hello2"},
  119. },
  120. },
  121. },
  122. },
  123. //Rec of complex type
  124. {
  125. stmt: &xsql.StreamStmt{
  126. Name: xsql.StreamName("demo"),
  127. StreamFields: []xsql.StreamField{
  128. {Name: "a", FieldType: &xsql.RecType{
  129. StreamFields: []xsql.StreamField{
  130. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  131. {Name: "c", FieldType: &xsql.RecType{
  132. StreamFields: []xsql.StreamField{
  133. {Name: "d", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  134. },
  135. }},
  136. },
  137. }},
  138. },
  139. },
  140. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  141. result: &xsql.Tuple{Message: xsql.Message{
  142. "a": map[string]interface{}{
  143. "b": "hello",
  144. "c": map[string]interface{}{
  145. "d": int(35),
  146. },
  147. },
  148. },
  149. },
  150. },
  151. }
  152. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  153. defer common.CloseLogger()
  154. for i, tt := range tests {
  155. pp := &Preprocessor{streamStmt: tt.stmt}
  156. dm := make(map[string]interface{})
  157. if e := json.Unmarshal(tt.data, &dm); e != nil {
  158. log.Fatal(e)
  159. return
  160. } else {
  161. tuple := &xsql.Tuple{Message:dm}
  162. result := pp.Apply(nil, tuple)
  163. if !reflect.DeepEqual(tt.result, result) {
  164. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  165. }
  166. }
  167. }
  168. }
  169. func TestPreprocessorTime_Apply(t *testing.T){
  170. var tests = []struct {
  171. stmt *xsql.StreamStmt
  172. data []byte
  173. result interface{}
  174. }{
  175. {
  176. stmt: &xsql.StreamStmt{
  177. Name: xsql.StreamName("demo"),
  178. StreamFields: []xsql.StreamField{
  179. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  180. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  181. },
  182. },
  183. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  184. result: &xsql.Tuple{Message: xsql.Message{
  185. "abc": common.TimeFromUnixMilli(1568854515000),
  186. "def": common.TimeFromUnixMilli(1568854573431),
  187. },
  188. },
  189. },
  190. {
  191. stmt: &xsql.StreamStmt{
  192. Name: xsql.StreamName("demo"),
  193. StreamFields: []xsql.StreamField{
  194. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  195. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  196. },
  197. },
  198. data: []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
  199. result: nil,
  200. },
  201. {
  202. stmt: &xsql.StreamStmt{
  203. Name: xsql.StreamName("demo"),
  204. StreamFields: []xsql.StreamField{
  205. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  206. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  207. },
  208. Options: map[string]string{
  209. "DATASOURCE" : "users",
  210. "FORMAT" : "AVRO",
  211. "KEY" : "USERID",
  212. "CONF_KEY" : "srv1",
  213. "TYPE" : "MQTT",
  214. "TIMESTAMP" : "USERID",
  215. "TIMESTAMP_FORMAT" : "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
  216. },
  217. },
  218. data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
  219. result: &xsql.Tuple{Message: xsql.Message{
  220. "abc": common.TimeFromUnixMilli(1568894115000),
  221. "def": common.TimeFromUnixMilli(1568854573431),
  222. }},
  223. },
  224. //Array type
  225. {
  226. stmt: &xsql.StreamStmt{
  227. Name: xsql.StreamName("demo"),
  228. StreamFields: []xsql.StreamField{
  229. {Name: "a", FieldType: &xsql.ArrayType{
  230. Type: xsql.DATETIME,
  231. }},
  232. },
  233. },
  234. data: []byte(`{"a": [1568854515123, 1568854573431]}`),
  235. result: &xsql.Tuple{Message: xsql.Message{
  236. "a": []time.Time{
  237. common.TimeFromUnixMilli(1568854515123),
  238. common.TimeFromUnixMilli(1568854573431),
  239. },
  240. },
  241. },
  242. },
  243. {
  244. stmt: &xsql.StreamStmt{
  245. Name: xsql.StreamName("demo"),
  246. StreamFields: []xsql.StreamField{
  247. {Name: "a", FieldType: &xsql.RecType{
  248. StreamFields: []xsql.StreamField{
  249. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  250. {Name: "c", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  251. },
  252. }},
  253. },
  254. },
  255. data: []byte(`{"a": {"b" : "hello", "c": 1568854515000}}`),
  256. result: &xsql.Tuple{Message: xsql.Message{
  257. "a": map[string]interface{}{
  258. "b": "hello",
  259. "c": common.TimeFromUnixMilli(1568854515000),
  260. },
  261. },
  262. },
  263. },
  264. }
  265. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  266. defer common.CloseLogger()
  267. for i, tt := range tests {
  268. pp := &Preprocessor{streamStmt: tt.stmt}
  269. dm := make(map[string]interface{})
  270. if e := json.Unmarshal(tt.data, &dm); e != nil {
  271. log.Fatal(e)
  272. return
  273. } else {
  274. tuple := &xsql.Tuple{Message:dm}
  275. result := pp.Apply(nil, tuple)
  276. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  277. if rt, ok := result.(*xsql.Tuple); ok{
  278. if rtt, ok := rt.Message["abc"].(time.Time); ok{
  279. rt.Message["abc"] = rtt.UTC()
  280. }
  281. }
  282. if !reflect.DeepEqual(tt.result, result) {
  283. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  284. }
  285. }
  286. }
  287. }
  288. func TestPreprocessorEventtime_Apply(t *testing.T) {
  289. var tests = []struct {
  290. stmt *xsql.StreamStmt
  291. data []byte
  292. result interface{}
  293. }{
  294. //Basic type
  295. {
  296. stmt: &xsql.StreamStmt{
  297. Name: xsql.StreamName("demo"),
  298. StreamFields: []xsql.StreamField{
  299. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  300. },
  301. Options: map[string]string{
  302. "DATASOURCE" : "users",
  303. "FORMAT" : "AVRO",
  304. "KEY" : "USERID",
  305. "CONF_KEY" : "srv1",
  306. "TYPE" : "MQTT",
  307. "TIMESTAMP" : "abc",
  308. "TIMESTAMP_FORMAT" : "yyyy-MM-dd''T''HH:mm:ssX'",
  309. },
  310. },
  311. data: []byte(`{"abc": 1568854515000}`),
  312. result: &xsql.Tuple{Message: xsql.Message{
  313. "abc": int(1568854515000),
  314. }, Timestamp: 1568854515000,
  315. },
  316. },
  317. {
  318. stmt: &xsql.StreamStmt{
  319. Name: xsql.StreamName("demo"),
  320. StreamFields: []xsql.StreamField{
  321. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  322. },
  323. Options: map[string]string{
  324. "DATASOURCE" : "users",
  325. "TIMESTAMP" : "abc",
  326. },
  327. },
  328. data: []byte(`{"abc": true}`),
  329. result: nil,
  330. },
  331. {
  332. stmt: &xsql.StreamStmt{
  333. Name: xsql.StreamName("demo"),
  334. StreamFields: []xsql.StreamField{
  335. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  336. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  337. },
  338. Options: map[string]string{
  339. "DATASOURCE" : "users",
  340. "TIMESTAMP" : "def",
  341. },
  342. },
  343. data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
  344. result: &xsql.Tuple{Message: xsql.Message{
  345. "abc": float64(34),
  346. "def": "2019-09-23T02:47:29.754Z",
  347. }, Timestamp: int64(1569206849754),
  348. },
  349. },
  350. {
  351. stmt: &xsql.StreamStmt{
  352. Name: xsql.StreamName("demo"),
  353. StreamFields: []xsql.StreamField{
  354. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  355. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  356. },
  357. Options: map[string]string{
  358. "DATASOURCE" : "users",
  359. "TIMESTAMP" : "abc",
  360. },
  361. },
  362. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  363. result: &xsql.Tuple{Message: xsql.Message{
  364. "abc": common.TimeFromUnixMilli(1568854515000),
  365. "def": common.TimeFromUnixMilli(1568854573431),
  366. }, Timestamp: int64(1568854515000),
  367. },
  368. },
  369. {
  370. stmt: &xsql.StreamStmt{
  371. Name: xsql.StreamName("demo"),
  372. StreamFields: []xsql.StreamField{
  373. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  374. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  375. },
  376. Options: map[string]string{
  377. "DATASOURCE" : "users",
  378. "TIMESTAMP" : "def",
  379. "TIMESTAMP_FORMAT" : "yyyy-MM-dd'AT'HH:mm:ss",
  380. },
  381. },
  382. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  383. result: &xsql.Tuple{Message: xsql.Message{
  384. "abc": float64(34),
  385. "def": "2019-09-23AT02:47:29",
  386. }, Timestamp: int64(1569206849000),
  387. },
  388. },
  389. {
  390. stmt: &xsql.StreamStmt{
  391. Name: xsql.StreamName("demo"),
  392. StreamFields: []xsql.StreamField{
  393. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  394. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  395. },
  396. Options: map[string]string{
  397. "DATASOURCE" : "users",
  398. "TIMESTAMP" : "def",
  399. "TIMESTAMP_FORMAT" : "yyyy-MM-ddaHH:mm:ss",
  400. },
  401. },
  402. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  403. result: nil,
  404. },
  405. }
  406. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  407. defer common.CloseLogger()
  408. for i, tt := range tests {
  409. pp, err := NewPreprocessor(tt.stmt, true)
  410. if err != nil{
  411. t.Error(err)
  412. }
  413. dm := make(map[string]interface{})
  414. if e := json.Unmarshal(tt.data, &dm); e != nil {
  415. log.Fatal(e)
  416. return
  417. } else {
  418. tuple := &xsql.Tuple{Message:dm}
  419. result := pp.Apply(nil, tuple)
  420. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  421. if rt, ok := result.(*xsql.Tuple); ok{
  422. if rtt, ok := rt.Message["abc"].(time.Time); ok{
  423. rt.Message["abc"] = rtt.UTC()
  424. }
  425. }
  426. if !reflect.DeepEqual(tt.result, result) {
  427. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  428. }
  429. }
  430. }
  431. }