preprocessor_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. package plans
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "log"
  9. "reflect"
  10. "testing"
  11. "time"
  12. )
  13. func TestPreprocessor_Apply(t *testing.T) {
  14. var tests = []struct {
  15. stmt *xsql.StreamStmt
  16. data []byte
  17. result interface{}
  18. }{
  19. //Basic type
  20. {
  21. stmt: &xsql.StreamStmt{
  22. Name: xsql.StreamName("demo"),
  23. StreamFields: []xsql.StreamField{
  24. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  25. },
  26. },
  27. data: []byte(`{"a": 6}`),
  28. result: nil,
  29. },
  30. {
  31. stmt: &xsql.StreamStmt{
  32. Name: xsql.StreamName("demo"),
  33. StreamFields: []xsql.StreamField{
  34. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  35. },
  36. },
  37. data: []byte(`{"abc": 6}`),
  38. result: &xsql.Tuple{Message: xsql.Message{
  39. "abc": int(6),
  40. },
  41. },
  42. },
  43. {
  44. stmt: &xsql.StreamStmt{
  45. Name: xsql.StreamName("demo"),
  46. StreamFields: []xsql.StreamField{
  47. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  48. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  49. },
  50. },
  51. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  52. result: &xsql.Tuple{Message: xsql.Message{
  53. "abc": float64(34),
  54. "def": "hello",
  55. },
  56. },
  57. },
  58. {
  59. stmt: &xsql.StreamStmt{
  60. Name: xsql.StreamName("demo"),
  61. StreamFields: []xsql.StreamField{
  62. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  63. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  64. },
  65. },
  66. data: []byte(`{"abc": 77, "def" : "hello"}`),
  67. result: nil,
  68. },
  69. {
  70. stmt: &xsql.StreamStmt{
  71. Name: xsql.StreamName("demo"),
  72. StreamFields: []xsql.StreamField{
  73. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  74. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  75. },
  76. },
  77. data: []byte(`{"a": {"b" : "hello"}}`),
  78. result: nil,
  79. },
  80. //Rec type
  81. {
  82. stmt: &xsql.StreamStmt{
  83. Name: xsql.StreamName("demo"),
  84. StreamFields: []xsql.StreamField{
  85. {Name: "a", FieldType: &xsql.RecType{
  86. StreamFields: []xsql.StreamField{
  87. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  88. },
  89. }},
  90. },
  91. },
  92. data: []byte(`{"a": {"b" : "hello"}}`),
  93. result: &xsql.Tuple{Message: xsql.Message{
  94. "a": map[string]interface{}{
  95. "b": "hello",
  96. },
  97. },
  98. },
  99. },
  100. //Array of complex type
  101. {
  102. stmt: &xsql.StreamStmt{
  103. Name: xsql.StreamName("demo"),
  104. StreamFields: []xsql.StreamField{
  105. {Name: "a", FieldType: &xsql.ArrayType{
  106. Type: xsql.STRUCT,
  107. FieldType: &xsql.RecType{
  108. StreamFields: []xsql.StreamField{
  109. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  110. },
  111. },
  112. }},
  113. },
  114. },
  115. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  116. result: &xsql.Tuple{Message: xsql.Message{
  117. "a": []map[string]interface{}{
  118. {"b": "hello1"},
  119. {"b": "hello2"},
  120. },
  121. },
  122. },
  123. },
  124. //Rec of complex type
  125. {
  126. stmt: &xsql.StreamStmt{
  127. Name: xsql.StreamName("demo"),
  128. StreamFields: []xsql.StreamField{
  129. {Name: "a", FieldType: &xsql.RecType{
  130. StreamFields: []xsql.StreamField{
  131. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  132. {Name: "c", FieldType: &xsql.RecType{
  133. StreamFields: []xsql.StreamField{
  134. {Name: "d", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  135. },
  136. }},
  137. },
  138. }},
  139. },
  140. },
  141. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  142. result: &xsql.Tuple{Message: xsql.Message{
  143. "a": map[string]interface{}{
  144. "b": "hello",
  145. "c": map[string]interface{}{
  146. "d": int(35),
  147. },
  148. },
  149. },
  150. },
  151. },
  152. }
  153. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  154. defer common.CloseLogger()
  155. contextLogger := common.Log.WithField("rule", "TestPreprocessor_Apply")
  156. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  157. for i, tt := range tests {
  158. pp := &Preprocessor{streamStmt: tt.stmt}
  159. dm := make(map[string]interface{})
  160. if e := json.Unmarshal(tt.data, &dm); e != nil {
  161. log.Fatal(e)
  162. return
  163. } else {
  164. tuple := &xsql.Tuple{Message:dm}
  165. result := pp.Apply(ctx, tuple)
  166. if !reflect.DeepEqual(tt.result, result) {
  167. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  168. }
  169. }
  170. }
  171. }
  172. func TestPreprocessorTime_Apply(t *testing.T){
  173. var tests = []struct {
  174. stmt *xsql.StreamStmt
  175. data []byte
  176. result interface{}
  177. }{
  178. {
  179. stmt: &xsql.StreamStmt{
  180. Name: xsql.StreamName("demo"),
  181. StreamFields: []xsql.StreamField{
  182. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  183. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  184. },
  185. },
  186. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  187. result: &xsql.Tuple{Message: xsql.Message{
  188. "abc": common.TimeFromUnixMilli(1568854515000),
  189. "def": common.TimeFromUnixMilli(1568854573431),
  190. },
  191. },
  192. },
  193. {
  194. stmt: &xsql.StreamStmt{
  195. Name: xsql.StreamName("demo"),
  196. StreamFields: []xsql.StreamField{
  197. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  198. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  199. },
  200. },
  201. data: []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
  202. result: nil,
  203. },
  204. {
  205. stmt: &xsql.StreamStmt{
  206. Name: xsql.StreamName("demo"),
  207. StreamFields: []xsql.StreamField{
  208. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  209. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  210. },
  211. Options: map[string]string{
  212. "DATASOURCE" : "users",
  213. "FORMAT" : "AVRO",
  214. "KEY" : "USERID",
  215. "CONF_KEY" : "srv1",
  216. "TYPE" : "MQTT",
  217. "TIMESTAMP" : "USERID",
  218. "TIMESTAMP_FORMAT" : "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
  219. },
  220. },
  221. data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
  222. result: &xsql.Tuple{Message: xsql.Message{
  223. "abc": common.TimeFromUnixMilli(1568894115000),
  224. "def": common.TimeFromUnixMilli(1568854573431),
  225. }},
  226. },
  227. //Array type
  228. {
  229. stmt: &xsql.StreamStmt{
  230. Name: xsql.StreamName("demo"),
  231. StreamFields: []xsql.StreamField{
  232. {Name: "a", FieldType: &xsql.ArrayType{
  233. Type: xsql.DATETIME,
  234. }},
  235. },
  236. },
  237. data: []byte(`{"a": [1568854515123, 1568854573431]}`),
  238. result: &xsql.Tuple{Message: xsql.Message{
  239. "a": []time.Time{
  240. common.TimeFromUnixMilli(1568854515123),
  241. common.TimeFromUnixMilli(1568854573431),
  242. },
  243. },
  244. },
  245. },
  246. {
  247. stmt: &xsql.StreamStmt{
  248. Name: xsql.StreamName("demo"),
  249. StreamFields: []xsql.StreamField{
  250. {Name: "a", FieldType: &xsql.RecType{
  251. StreamFields: []xsql.StreamField{
  252. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  253. {Name: "c", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  254. },
  255. }},
  256. },
  257. },
  258. data: []byte(`{"a": {"b" : "hello", "c": 1568854515000}}`),
  259. result: &xsql.Tuple{Message: xsql.Message{
  260. "a": map[string]interface{}{
  261. "b": "hello",
  262. "c": common.TimeFromUnixMilli(1568854515000),
  263. },
  264. },
  265. },
  266. },
  267. }
  268. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  269. defer common.CloseLogger()
  270. contextLogger := common.Log.WithField("rule", "TestPreprocessorTime_Apply")
  271. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  272. for i, tt := range tests {
  273. pp := &Preprocessor{streamStmt: tt.stmt}
  274. dm := make(map[string]interface{})
  275. if e := json.Unmarshal(tt.data, &dm); e != nil {
  276. log.Fatal(e)
  277. return
  278. } else {
  279. tuple := &xsql.Tuple{Message:dm}
  280. result := pp.Apply(ctx, tuple)
  281. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  282. if rt, ok := result.(*xsql.Tuple); ok{
  283. if rtt, ok := rt.Message["abc"].(time.Time); ok{
  284. rt.Message["abc"] = rtt.UTC()
  285. }
  286. }
  287. if !reflect.DeepEqual(tt.result, result) {
  288. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  289. }
  290. }
  291. }
  292. }
  293. func TestPreprocessorEventtime_Apply(t *testing.T) {
  294. var tests = []struct {
  295. stmt *xsql.StreamStmt
  296. data []byte
  297. result interface{}
  298. }{
  299. //Basic type
  300. {
  301. stmt: &xsql.StreamStmt{
  302. Name: xsql.StreamName("demo"),
  303. StreamFields: []xsql.StreamField{
  304. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  305. },
  306. Options: map[string]string{
  307. "DATASOURCE" : "users",
  308. "FORMAT" : "AVRO",
  309. "KEY" : "USERID",
  310. "CONF_KEY" : "srv1",
  311. "TYPE" : "MQTT",
  312. "TIMESTAMP" : "abc",
  313. "TIMESTAMP_FORMAT" : "yyyy-MM-dd''T''HH:mm:ssX'",
  314. },
  315. },
  316. data: []byte(`{"abc": 1568854515000}`),
  317. result: &xsql.Tuple{Message: xsql.Message{
  318. "abc": int(1568854515000),
  319. }, Timestamp: 1568854515000,
  320. },
  321. },
  322. {
  323. stmt: &xsql.StreamStmt{
  324. Name: xsql.StreamName("demo"),
  325. StreamFields: []xsql.StreamField{
  326. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  327. },
  328. Options: map[string]string{
  329. "DATASOURCE" : "users",
  330. "TIMESTAMP" : "abc",
  331. },
  332. },
  333. data: []byte(`{"abc": true}`),
  334. result: nil,
  335. },
  336. {
  337. stmt: &xsql.StreamStmt{
  338. Name: xsql.StreamName("demo"),
  339. StreamFields: []xsql.StreamField{
  340. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  341. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  342. },
  343. Options: map[string]string{
  344. "DATASOURCE" : "users",
  345. "TIMESTAMP" : "def",
  346. },
  347. },
  348. data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
  349. result: &xsql.Tuple{Message: xsql.Message{
  350. "abc": float64(34),
  351. "def": "2019-09-23T02:47:29.754Z",
  352. }, Timestamp: int64(1569206849754),
  353. },
  354. },
  355. {
  356. stmt: &xsql.StreamStmt{
  357. Name: xsql.StreamName("demo"),
  358. StreamFields: []xsql.StreamField{
  359. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  360. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  361. },
  362. Options: map[string]string{
  363. "DATASOURCE" : "users",
  364. "TIMESTAMP" : "abc",
  365. },
  366. },
  367. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  368. result: &xsql.Tuple{Message: xsql.Message{
  369. "abc": common.TimeFromUnixMilli(1568854515000),
  370. "def": common.TimeFromUnixMilli(1568854573431),
  371. }, Timestamp: int64(1568854515000),
  372. },
  373. },
  374. {
  375. stmt: &xsql.StreamStmt{
  376. Name: xsql.StreamName("demo"),
  377. StreamFields: []xsql.StreamField{
  378. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  379. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  380. },
  381. Options: map[string]string{
  382. "DATASOURCE" : "users",
  383. "TIMESTAMP" : "def",
  384. "TIMESTAMP_FORMAT" : "yyyy-MM-dd'AT'HH:mm:ss",
  385. },
  386. },
  387. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  388. result: &xsql.Tuple{Message: xsql.Message{
  389. "abc": float64(34),
  390. "def": "2019-09-23AT02:47:29",
  391. }, Timestamp: int64(1569206849000),
  392. },
  393. },
  394. {
  395. stmt: &xsql.StreamStmt{
  396. Name: xsql.StreamName("demo"),
  397. StreamFields: []xsql.StreamField{
  398. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  399. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  400. },
  401. Options: map[string]string{
  402. "DATASOURCE" : "users",
  403. "TIMESTAMP" : "def",
  404. "TIMESTAMP_FORMAT" : "yyyy-MM-ddaHH:mm:ss",
  405. },
  406. },
  407. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  408. result: nil,
  409. },
  410. }
  411. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  412. defer common.CloseLogger()
  413. contextLogger := common.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
  414. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  415. for i, tt := range tests {
  416. pp, err := NewPreprocessor(tt.stmt, nil,true)
  417. if err != nil{
  418. t.Error(err)
  419. }
  420. dm := make(map[string]interface{})
  421. if e := json.Unmarshal(tt.data, &dm); e != nil {
  422. log.Fatal(e)
  423. return
  424. } else {
  425. tuple := &xsql.Tuple{Message:dm}
  426. result := pp.Apply(ctx, tuple)
  427. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  428. if rt, ok := result.(*xsql.Tuple); ok{
  429. if rtt, ok := rt.Message["abc"].(time.Time); ok{
  430. rt.Message["abc"] = rtt.UTC()
  431. }
  432. }
  433. if !reflect.DeepEqual(tt.result, result) {
  434. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  435. }
  436. }
  437. }
  438. }