preprocessor_test.go 28 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084
  1. package operator
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/lf-edge/ekuiper/internal/conf"
  8. "github.com/lf-edge/ekuiper/internal/topo/context"
  9. "github.com/lf-edge/ekuiper/internal/xsql"
  10. "github.com/lf-edge/ekuiper/pkg/ast"
  11. "github.com/lf-edge/ekuiper/pkg/cast"
  12. "github.com/lf-edge/ekuiper/pkg/message"
  13. "io/ioutil"
  14. "log"
  15. "path"
  16. "reflect"
  17. "testing"
  18. "time"
  19. )
  20. func TestPreprocessor_Apply(t *testing.T) {
  21. var tests = []struct {
  22. stmt *ast.StreamStmt
  23. data []byte
  24. result interface{}
  25. }{
  26. //Basic type
  27. {
  28. stmt: &ast.StreamStmt{
  29. Name: ast.StreamName("demo"),
  30. StreamFields: []ast.StreamField{
  31. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  32. },
  33. },
  34. data: []byte(`{"a": 6}`),
  35. result: errors.New("error in preprocessor: invalid data map[a:%!s(float64=6)], field abc not found"),
  36. },
  37. {
  38. stmt: &ast.StreamStmt{
  39. Name: ast.StreamName("demo"),
  40. StreamFields: []ast.StreamField{
  41. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  42. },
  43. },
  44. data: []byte(`{"abc": null}`),
  45. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found <nil>(<nil>)"),
  46. },
  47. {
  48. stmt: &ast.StreamStmt{
  49. Name: ast.StreamName("demo"),
  50. StreamFields: nil,
  51. },
  52. data: []byte(`{"a": 6}`),
  53. result: &xsql.Tuple{Message: xsql.Message{
  54. "a": float64(6),
  55. },
  56. },
  57. },
  58. {
  59. stmt: &ast.StreamStmt{
  60. Name: ast.StreamName("demo"),
  61. StreamFields: []ast.StreamField{
  62. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  63. },
  64. },
  65. data: []byte(`{"abc": 6}`),
  66. result: &xsql.Tuple{Message: xsql.Message{
  67. "abc": 6,
  68. },
  69. },
  70. },
  71. {
  72. stmt: &ast.StreamStmt{
  73. Name: ast.StreamName("demo"),
  74. StreamFields: nil,
  75. },
  76. data: []byte(`{"abc": 6}`),
  77. result: &xsql.Tuple{Message: xsql.Message{
  78. "abc": float64(6),
  79. },
  80. },
  81. },
  82. {
  83. stmt: &ast.StreamStmt{
  84. Name: ast.StreamName("demo"),
  85. StreamFields: []ast.StreamField{
  86. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  87. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  88. },
  89. },
  90. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  91. result: &xsql.Tuple{Message: xsql.Message{
  92. "abc": float64(34),
  93. "def": "hello",
  94. },
  95. },
  96. },
  97. {
  98. stmt: &ast.StreamStmt{
  99. Name: ast.StreamName("demo"),
  100. StreamFields: nil,
  101. },
  102. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  103. result: &xsql.Tuple{Message: xsql.Message{
  104. "abc": float64(34),
  105. "def": "hello",
  106. "ghi": float64(50),
  107. },
  108. },
  109. },
  110. {
  111. stmt: &ast.StreamStmt{
  112. Name: ast.StreamName("demo"),
  113. StreamFields: []ast.StreamField{
  114. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  115. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  116. },
  117. },
  118. data: []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
  119. result: &xsql.Tuple{Message: xsql.Message{
  120. "abc": float64(34),
  121. "def": "hello",
  122. },
  123. },
  124. },
  125. {
  126. stmt: &ast.StreamStmt{
  127. Name: ast.StreamName("demo"),
  128. StreamFields: []ast.StreamField{
  129. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  130. {Name: "def", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  131. },
  132. },
  133. data: []byte(`{"abc": 77, "def" : "hello"}`),
  134. result: errors.New("error in preprocessor: invalid data type for def, expect boolean but found string(hello)"),
  135. },
  136. {
  137. stmt: &ast.StreamStmt{
  138. Name: ast.StreamName("demo"),
  139. StreamFields: []ast.StreamField{
  140. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  141. {Name: "def", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  142. },
  143. },
  144. data: []byte(`{"a": {"b" : "hello"}}`),
  145. result: errors.New("error in preprocessor: invalid data map[a:map[b:hello]], field abc not found"),
  146. },
  147. {
  148. stmt: &ast.StreamStmt{
  149. Name: ast.StreamName("demo"),
  150. StreamFields: nil,
  151. },
  152. data: []byte(`{"a": {"b" : "hello"}}`),
  153. result: &xsql.Tuple{Message: xsql.Message{
  154. "a": map[string]interface{}{
  155. "b": "hello",
  156. },
  157. },
  158. },
  159. },
  160. //Rec type
  161. {
  162. stmt: &ast.StreamStmt{
  163. Name: ast.StreamName("demo"),
  164. StreamFields: []ast.StreamField{
  165. {Name: "a", FieldType: &ast.RecType{
  166. StreamFields: []ast.StreamField{
  167. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  168. },
  169. }},
  170. },
  171. },
  172. data: []byte(`{"a": {"b" : "hello"}}`),
  173. result: &xsql.Tuple{Message: xsql.Message{
  174. "a": map[string]interface{}{
  175. "b": "hello",
  176. },
  177. },
  178. },
  179. },
  180. {
  181. stmt: &ast.StreamStmt{
  182. Name: ast.StreamName("demo"),
  183. StreamFields: []ast.StreamField{
  184. {Name: "a", FieldType: &ast.RecType{
  185. StreamFields: []ast.StreamField{
  186. {Name: "b", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  187. },
  188. }},
  189. },
  190. },
  191. data: []byte(`{"a": "{\"b\" : \"32\"}"}`),
  192. result: &xsql.Tuple{Message: xsql.Message{
  193. "a": map[string]interface{}{
  194. "b": float64(32),
  195. },
  196. },
  197. },
  198. },
  199. {
  200. stmt: &ast.StreamStmt{
  201. Name: ast.StreamName("demo"),
  202. StreamFields: nil,
  203. },
  204. data: []byte(`{"a": {"b" : "32"}}`),
  205. result: &xsql.Tuple{Message: xsql.Message{
  206. "a": map[string]interface{}{
  207. "b": "32",
  208. },
  209. },
  210. },
  211. },
  212. //Array of complex type
  213. {
  214. stmt: &ast.StreamStmt{
  215. Name: ast.StreamName("demo"),
  216. StreamFields: []ast.StreamField{
  217. {Name: "a", FieldType: &ast.ArrayType{
  218. Type: ast.STRUCT,
  219. FieldType: &ast.RecType{
  220. StreamFields: []ast.StreamField{
  221. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  222. },
  223. },
  224. }},
  225. },
  226. },
  227. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  228. result: &xsql.Tuple{Message: xsql.Message{
  229. "a": []map[string]interface{}{
  230. {"b": "hello1"},
  231. {"b": "hello2"},
  232. },
  233. },
  234. },
  235. },
  236. {
  237. stmt: &ast.StreamStmt{
  238. Name: ast.StreamName("demo"),
  239. StreamFields: []ast.StreamField{
  240. {Name: "a", FieldType: &ast.ArrayType{
  241. Type: ast.STRUCT,
  242. FieldType: &ast.RecType{
  243. StreamFields: []ast.StreamField{
  244. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  245. },
  246. },
  247. }},
  248. },
  249. },
  250. data: []byte(`{"a": []}`),
  251. result: &xsql.Tuple{Message: xsql.Message{
  252. "a": make([]map[string]interface{}, 0),
  253. },
  254. },
  255. },
  256. {
  257. stmt: &ast.StreamStmt{
  258. Name: ast.StreamName("demo"),
  259. StreamFields: []ast.StreamField{
  260. {Name: "a", FieldType: &ast.ArrayType{
  261. Type: ast.STRUCT,
  262. FieldType: &ast.RecType{
  263. StreamFields: []ast.StreamField{
  264. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  265. },
  266. },
  267. }},
  268. },
  269. },
  270. data: []byte(`{"a": null}`),
  271. result: &xsql.Tuple{Message: xsql.Message{
  272. "a": []map[string]interface{}(nil),
  273. },
  274. },
  275. },
  276. {
  277. stmt: &ast.StreamStmt{
  278. Name: ast.StreamName("demo"),
  279. StreamFields: []ast.StreamField{
  280. {Name: "a", FieldType: &ast.ArrayType{
  281. Type: ast.STRUCT,
  282. FieldType: &ast.RecType{
  283. StreamFields: []ast.StreamField{
  284. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  285. },
  286. },
  287. }},
  288. },
  289. },
  290. data: []byte(`{"a": [null, {"b" : "hello2"}]}`),
  291. result: &xsql.Tuple{Message: xsql.Message{
  292. "a": []map[string]interface{}{
  293. nil,
  294. {"b": "hello2"},
  295. },
  296. },
  297. },
  298. },
  299. {
  300. stmt: &ast.StreamStmt{
  301. Name: ast.StreamName("demo"),
  302. StreamFields: []ast.StreamField{
  303. {Name: "a", FieldType: &ast.ArrayType{
  304. Type: ast.ARRAY,
  305. FieldType: &ast.ArrayType{
  306. Type: ast.BIGINT,
  307. },
  308. }},
  309. },
  310. },
  311. data: []byte(`{"a": [[50, 60, 70],[66], [77]]}`),
  312. result: &xsql.Tuple{Message: xsql.Message{
  313. "a": [][]int{
  314. {50, 60, 70},
  315. {66},
  316. {77},
  317. },
  318. },
  319. },
  320. },
  321. {
  322. stmt: &ast.StreamStmt{
  323. Name: ast.StreamName("demo"),
  324. StreamFields: []ast.StreamField{
  325. {Name: "a", FieldType: &ast.ArrayType{
  326. Type: ast.ARRAY,
  327. FieldType: &ast.ArrayType{
  328. Type: ast.BIGINT,
  329. },
  330. }},
  331. },
  332. },
  333. data: []byte(`{"a": [null, [66], [77]]}`),
  334. result: &xsql.Tuple{Message: xsql.Message{
  335. "a": [][]int{
  336. []int(nil),
  337. {66},
  338. {77},
  339. },
  340. },
  341. },
  342. },
  343. {
  344. stmt: &ast.StreamStmt{
  345. Name: ast.StreamName("demo"),
  346. StreamFields: nil,
  347. },
  348. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  349. result: &xsql.Tuple{Message: xsql.Message{
  350. "a": []interface{}{
  351. map[string]interface{}{"b": "hello1"},
  352. map[string]interface{}{"b": "hello2"},
  353. },
  354. },
  355. },
  356. },
  357. {
  358. stmt: &ast.StreamStmt{
  359. Name: ast.StreamName("demo"),
  360. StreamFields: []ast.StreamField{
  361. {Name: "a", FieldType: &ast.ArrayType{
  362. Type: ast.FLOAT,
  363. }},
  364. },
  365. },
  366. data: []byte(`{"a": "[\"55\", \"77\"]"}`),
  367. result: &xsql.Tuple{Message: xsql.Message{
  368. "a": []float64{
  369. 55,
  370. 77,
  371. },
  372. },
  373. },
  374. },
  375. {
  376. stmt: &ast.StreamStmt{
  377. Name: ast.StreamName("demo"),
  378. StreamFields: nil,
  379. },
  380. data: []byte(`{"a": [55, 77]}`),
  381. result: &xsql.Tuple{Message: xsql.Message{
  382. "a": []interface{}{
  383. float64(55),
  384. float64(77),
  385. },
  386. },
  387. },
  388. },
  389. //Rec of complex type
  390. {
  391. stmt: &ast.StreamStmt{
  392. Name: ast.StreamName("demo"),
  393. StreamFields: []ast.StreamField{
  394. {Name: "a", FieldType: &ast.RecType{
  395. StreamFields: []ast.StreamField{
  396. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  397. {Name: "c", FieldType: &ast.RecType{
  398. StreamFields: []ast.StreamField{
  399. {Name: "d", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  400. },
  401. }},
  402. },
  403. }},
  404. },
  405. },
  406. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  407. result: &xsql.Tuple{Message: xsql.Message{
  408. "a": map[string]interface{}{
  409. "b": "hello",
  410. "c": map[string]interface{}{
  411. "d": 35,
  412. },
  413. },
  414. },
  415. },
  416. },
  417. {
  418. stmt: &ast.StreamStmt{
  419. Name: ast.StreamName("demo"),
  420. StreamFields: []ast.StreamField{
  421. {Name: "a", FieldType: &ast.RecType{
  422. StreamFields: []ast.StreamField{
  423. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  424. {Name: "c", FieldType: &ast.RecType{
  425. StreamFields: []ast.StreamField{
  426. {Name: "d", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  427. },
  428. }},
  429. },
  430. }},
  431. },
  432. },
  433. data: []byte(`{"a": null}`),
  434. result: &xsql.Tuple{Message: xsql.Message{
  435. "a": map[string]interface{}(nil),
  436. },
  437. },
  438. },
  439. {
  440. stmt: &ast.StreamStmt{
  441. Name: ast.StreamName("demo"),
  442. StreamFields: []ast.StreamField{
  443. {Name: "a", FieldType: &ast.RecType{
  444. StreamFields: []ast.StreamField{
  445. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  446. {Name: "c", FieldType: &ast.ArrayType{
  447. Type: ast.FLOAT,
  448. }},
  449. },
  450. }},
  451. },
  452. },
  453. data: []byte(`{"a": {"b" : "hello", "c": [35.2, 38.2]}}`),
  454. result: &xsql.Tuple{Message: xsql.Message{
  455. "a": map[string]interface{}{
  456. "b": "hello",
  457. "c": []float64{
  458. 35.2, 38.2,
  459. },
  460. },
  461. },
  462. },
  463. },
  464. {
  465. stmt: &ast.StreamStmt{
  466. Name: ast.StreamName("demo"),
  467. StreamFields: []ast.StreamField{
  468. {Name: "a", FieldType: &ast.RecType{
  469. StreamFields: []ast.StreamField{
  470. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  471. {Name: "c", FieldType: &ast.ArrayType{
  472. Type: ast.FLOAT,
  473. }},
  474. },
  475. }},
  476. },
  477. },
  478. data: []byte(`{"a": {"b" : "hello", "c": null}}`),
  479. result: &xsql.Tuple{Message: xsql.Message{
  480. "a": map[string]interface{}{
  481. "b": "hello",
  482. "c": []float64(nil),
  483. },
  484. },
  485. },
  486. },
  487. {
  488. stmt: &ast.StreamStmt{
  489. Name: ast.StreamName("demo"),
  490. StreamFields: []ast.StreamField{
  491. {Name: "a", FieldType: &ast.RecType{
  492. StreamFields: []ast.StreamField{
  493. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  494. {Name: "c", FieldType: &ast.ArrayType{
  495. Type: ast.FLOAT,
  496. }},
  497. },
  498. }},
  499. },
  500. },
  501. data: []byte(`{"a": {"b" : "hello", "c": [null, 35.4]}}`),
  502. result: errors.New("error in preprocessor: fail to parse field c: invalid data type for [0], expect float but found <nil>(<nil>)"),
  503. },
  504. {
  505. stmt: &ast.StreamStmt{
  506. Name: ast.StreamName("demo"),
  507. StreamFields: nil,
  508. },
  509. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  510. result: &xsql.Tuple{Message: xsql.Message{
  511. "a": map[string]interface{}{
  512. "b": "hello",
  513. "c": map[string]interface{}{
  514. "d": 35.2,
  515. },
  516. },
  517. },
  518. },
  519. },
  520. }
  521. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  522. defer conf.CloseLogger()
  523. contextLogger := conf.Log.WithField("rule", "TestPreprocessor_Apply")
  524. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  525. for i, tt := range tests {
  526. pp := &Preprocessor{}
  527. pp.streamFields = convertFields(tt.stmt.StreamFields)
  528. dm := make(map[string]interface{})
  529. if e := json.Unmarshal(tt.data, &dm); e != nil {
  530. log.Fatal(e)
  531. return
  532. } else {
  533. tuple := &xsql.Tuple{Message: dm}
  534. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  535. result := pp.Apply(ctx, tuple, fv, afv)
  536. if !reflect.DeepEqual(tt.result, result) {
  537. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  538. }
  539. }
  540. }
  541. }
  542. func TestPreprocessorTime_Apply(t *testing.T) {
  543. var tests = []struct {
  544. stmt *ast.StreamStmt
  545. data []byte
  546. result interface{}
  547. }{
  548. {
  549. stmt: &ast.StreamStmt{
  550. Name: ast.StreamName("demo"),
  551. StreamFields: []ast.StreamField{
  552. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  553. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  554. },
  555. },
  556. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  557. result: &xsql.Tuple{Message: xsql.Message{
  558. "abc": cast.TimeFromUnixMilli(1568854515000),
  559. "def": cast.TimeFromUnixMilli(1568854573431),
  560. },
  561. },
  562. },
  563. {
  564. stmt: &ast.StreamStmt{
  565. Name: ast.StreamName("demo"),
  566. StreamFields: nil,
  567. },
  568. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  569. result: &xsql.Tuple{Message: xsql.Message{
  570. "abc": "2019-09-19T00:55:15.000Z",
  571. "def": float64(1568854573431),
  572. },
  573. },
  574. },
  575. {
  576. stmt: &ast.StreamStmt{
  577. Name: ast.StreamName("demo"),
  578. StreamFields: []ast.StreamField{
  579. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  580. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  581. },
  582. },
  583. data: []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
  584. result: errors.New("error in preprocessor: invalid data type for abc, cannot convert to datetime: parsing time \"2019-09-19T00:55:1dd5Z\" as \"2006-01-02T15:04:05.000Z07:00\": cannot parse \"1dd5Z\" as \"05\""),
  585. },
  586. {
  587. stmt: &ast.StreamStmt{
  588. Name: ast.StreamName("demo"),
  589. StreamFields: []ast.StreamField{
  590. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  591. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  592. },
  593. Options: &ast.Options{
  594. DATASOURCE: "users",
  595. FORMAT: "JSON",
  596. KEY: "USERID",
  597. CONF_KEY: "srv1",
  598. TYPE: "MQTT",
  599. TIMESTAMP: "USERID",
  600. TIMESTAMP_FORMAT: "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
  601. },
  602. },
  603. data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
  604. result: &xsql.Tuple{Message: xsql.Message{
  605. "abc": cast.TimeFromUnixMilli(1568894115000),
  606. "def": cast.TimeFromUnixMilli(1568854573431),
  607. }},
  608. },
  609. //Array type
  610. {
  611. stmt: &ast.StreamStmt{
  612. Name: ast.StreamName("demo"),
  613. StreamFields: []ast.StreamField{
  614. {Name: "a", FieldType: &ast.ArrayType{
  615. Type: ast.DATETIME,
  616. }},
  617. },
  618. },
  619. data: []byte(`{"a": [1568854515123, 1568854573431]}`),
  620. result: &xsql.Tuple{Message: xsql.Message{
  621. "a": []time.Time{
  622. cast.TimeFromUnixMilli(1568854515123),
  623. cast.TimeFromUnixMilli(1568854573431),
  624. },
  625. },
  626. },
  627. },
  628. {
  629. stmt: &ast.StreamStmt{
  630. Name: ast.StreamName("demo"),
  631. StreamFields: []ast.StreamField{
  632. {Name: "a", FieldType: &ast.RecType{
  633. StreamFields: []ast.StreamField{
  634. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  635. {Name: "c", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  636. },
  637. }},
  638. },
  639. },
  640. data: []byte(`{"a": {"b" : "hello", "c": 1568854515000}}`),
  641. result: &xsql.Tuple{Message: xsql.Message{
  642. "a": map[string]interface{}{
  643. "b": "hello",
  644. "c": cast.TimeFromUnixMilli(1568854515000),
  645. },
  646. },
  647. },
  648. },
  649. }
  650. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  651. defer conf.CloseLogger()
  652. contextLogger := conf.Log.WithField("rule", "TestPreprocessorTime_Apply")
  653. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  654. for i, tt := range tests {
  655. pp := &Preprocessor{}
  656. pp.streamFields = convertFields(tt.stmt.StreamFields)
  657. if tt.stmt.Options != nil {
  658. pp.timestampFormat = tt.stmt.Options.TIMESTAMP_FORMAT
  659. }
  660. dm := make(map[string]interface{})
  661. if e := json.Unmarshal(tt.data, &dm); e != nil {
  662. log.Fatal(e)
  663. return
  664. } else {
  665. tuple := &xsql.Tuple{Message: dm}
  666. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  667. result := pp.Apply(ctx, tuple, fv, afv)
  668. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  669. if rt, ok := result.(*xsql.Tuple); ok {
  670. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  671. rt.Message["abc"] = rtt.UTC()
  672. }
  673. }
  674. if !reflect.DeepEqual(tt.result, result) {
  675. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  676. }
  677. }
  678. }
  679. }
  680. func convertFields(o ast.StreamFields) []interface{} {
  681. if o == nil {
  682. return nil
  683. }
  684. fields := make([]interface{}, len(o))
  685. for i := range o {
  686. fields[i] = &o[i]
  687. }
  688. return fields
  689. }
  690. func TestPreprocessorEventtime_Apply(t *testing.T) {
  691. var tests = []struct {
  692. stmt *ast.StreamStmt
  693. data []byte
  694. result interface{}
  695. }{
  696. //Basic type
  697. {
  698. stmt: &ast.StreamStmt{
  699. Name: ast.StreamName("demo"),
  700. StreamFields: []ast.StreamField{
  701. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  702. },
  703. Options: &ast.Options{
  704. DATASOURCE: "users",
  705. FORMAT: "JSON",
  706. KEY: "USERID",
  707. CONF_KEY: "srv1",
  708. TYPE: "MQTT",
  709. TIMESTAMP: "abc",
  710. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  711. },
  712. },
  713. data: []byte(`{"abc": 1568854515000}`),
  714. result: &xsql.Tuple{Message: xsql.Message{
  715. "abc": 1568854515000,
  716. }, Timestamp: 1568854515000,
  717. },
  718. },
  719. {
  720. stmt: &ast.StreamStmt{
  721. Name: ast.StreamName("demo"),
  722. StreamFields: nil,
  723. Options: &ast.Options{
  724. DATASOURCE: "users",
  725. FORMAT: "JSON",
  726. KEY: "USERID",
  727. CONF_KEY: "srv1",
  728. TYPE: "MQTT",
  729. TIMESTAMP: "abc",
  730. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  731. },
  732. },
  733. data: []byte(`{"abc": 1568854515000}`),
  734. result: &xsql.Tuple{Message: xsql.Message{
  735. "abc": float64(1568854515000),
  736. }, Timestamp: 1568854515000,
  737. },
  738. },
  739. {
  740. stmt: &ast.StreamStmt{
  741. Name: ast.StreamName("demo"),
  742. StreamFields: []ast.StreamField{
  743. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  744. },
  745. Options: &ast.Options{
  746. DATASOURCE: "users",
  747. TIMESTAMP: "abc",
  748. },
  749. },
  750. data: []byte(`{"abc": true}`),
  751. result: errors.New("cannot convert timestamp field abc to timestamp with error unsupported type to convert to timestamp true"),
  752. },
  753. {
  754. stmt: &ast.StreamStmt{
  755. Name: ast.StreamName("demo"),
  756. StreamFields: []ast.StreamField{
  757. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  758. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  759. },
  760. Options: &ast.Options{
  761. DATASOURCE: "users",
  762. TIMESTAMP: "def",
  763. },
  764. },
  765. data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
  766. result: &xsql.Tuple{Message: xsql.Message{
  767. "abc": float64(34),
  768. "def": "2019-09-23T02:47:29.754Z",
  769. }, Timestamp: int64(1569206849754),
  770. },
  771. },
  772. {
  773. stmt: &ast.StreamStmt{
  774. Name: ast.StreamName("demo"),
  775. StreamFields: []ast.StreamField{
  776. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  777. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  778. },
  779. Options: &ast.Options{
  780. DATASOURCE: "users",
  781. TIMESTAMP: "abc",
  782. },
  783. },
  784. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  785. result: &xsql.Tuple{Message: xsql.Message{
  786. "abc": cast.TimeFromUnixMilli(1568854515000),
  787. "def": cast.TimeFromUnixMilli(1568854573431),
  788. }, Timestamp: int64(1568854515000),
  789. },
  790. },
  791. {
  792. stmt: &ast.StreamStmt{
  793. Name: ast.StreamName("demo"),
  794. StreamFields: []ast.StreamField{
  795. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  796. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  797. },
  798. Options: &ast.Options{
  799. DATASOURCE: "users",
  800. TIMESTAMP: "def",
  801. TIMESTAMP_FORMAT: "yyyy-MM-dd'AT'HH:mm:ss",
  802. },
  803. },
  804. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  805. result: &xsql.Tuple{Message: xsql.Message{
  806. "abc": float64(34),
  807. "def": "2019-09-23AT02:47:29",
  808. }, Timestamp: int64(1569206849000),
  809. },
  810. },
  811. {
  812. stmt: &ast.StreamStmt{
  813. Name: ast.StreamName("demo"),
  814. StreamFields: []ast.StreamField{
  815. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  816. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  817. },
  818. Options: &ast.Options{
  819. DATASOURCE: "users",
  820. TIMESTAMP: "def",
  821. TIMESTAMP_FORMAT: "yyyy-MM-ddaHH:mm:ss",
  822. },
  823. },
  824. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  825. result: errors.New("cannot convert timestamp field def to timestamp with error parsing time \"2019-09-23AT02:47:29\" as \"2006-01-02PM15:04:05\": cannot parse \"02:47:29\" as \"PM\""),
  826. },
  827. }
  828. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  829. defer conf.CloseLogger()
  830. contextLogger := conf.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
  831. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  832. for i, tt := range tests {
  833. pp := &Preprocessor{
  834. defaultFieldProcessor: defaultFieldProcessor{
  835. streamFields: convertFields(tt.stmt.StreamFields),
  836. isBinary: false,
  837. timestampFormat: tt.stmt.Options.TIMESTAMP_FORMAT,
  838. },
  839. isEventTime: true,
  840. timestampField: tt.stmt.Options.TIMESTAMP,
  841. }
  842. dm := make(map[string]interface{})
  843. if e := json.Unmarshal(tt.data, &dm); e != nil {
  844. log.Fatal(e)
  845. return
  846. } else {
  847. tuple := &xsql.Tuple{Message: dm}
  848. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  849. result := pp.Apply(ctx, tuple, fv, afv)
  850. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  851. if rt, ok := result.(*xsql.Tuple); ok {
  852. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  853. rt.Message["abc"] = rtt.UTC()
  854. }
  855. }
  856. if !reflect.DeepEqual(tt.result, result) {
  857. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  858. }
  859. }
  860. }
  861. }
  862. func TestPreprocessorError(t *testing.T) {
  863. tests := []struct {
  864. stmt *ast.StreamStmt
  865. data []byte
  866. result interface{}
  867. }{
  868. {
  869. stmt: &ast.StreamStmt{
  870. Name: ast.StreamName("demo"),
  871. StreamFields: []ast.StreamField{
  872. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  873. },
  874. },
  875. data: []byte(`{"abc": "dafsad"}`),
  876. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(dafsad)"),
  877. }, {
  878. stmt: &ast.StreamStmt{
  879. Name: ast.StreamName("demo"),
  880. StreamFields: []ast.StreamField{
  881. {Name: "a", FieldType: &ast.RecType{
  882. StreamFields: []ast.StreamField{
  883. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  884. },
  885. }},
  886. },
  887. },
  888. data: []byte(`{"a": {"d" : "hello"}}`),
  889. result: errors.New("error in preprocessor: invalid data map[d:hello], field b not found"),
  890. }, {
  891. stmt: &ast.StreamStmt{
  892. Name: ast.StreamName("demo"),
  893. StreamFields: []ast.StreamField{
  894. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  895. },
  896. Options: &ast.Options{
  897. DATASOURCE: "users",
  898. FORMAT: "JSON",
  899. KEY: "USERID",
  900. CONF_KEY: "srv1",
  901. TYPE: "MQTT",
  902. TIMESTAMP: "abc",
  903. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  904. },
  905. },
  906. data: []byte(`{"abc": "not a time"}`),
  907. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(not a time)"),
  908. },
  909. }
  910. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  911. defer conf.CloseLogger()
  912. contextLogger := conf.Log.WithField("rule", "TestPreprocessorError")
  913. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  914. for i, tt := range tests {
  915. pp := &Preprocessor{}
  916. pp.streamFields = convertFields(tt.stmt.StreamFields)
  917. dm := make(map[string]interface{})
  918. if e := json.Unmarshal(tt.data, &dm); e != nil {
  919. log.Fatal(e)
  920. return
  921. } else {
  922. tuple := &xsql.Tuple{Message: dm}
  923. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  924. result := pp.Apply(ctx, tuple, fv, afv)
  925. if !reflect.DeepEqual(tt.result, result) {
  926. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  927. }
  928. }
  929. }
  930. }
  931. func TestPreprocessorForBinary(t *testing.T) {
  932. docsFolder, err := conf.GetLoc("docs/")
  933. if err != nil {
  934. t.Errorf("Cannot find docs folder: %v", err)
  935. }
  936. image, err := ioutil.ReadFile(path.Join(docsFolder, "cover.jpg"))
  937. if err != nil {
  938. t.Errorf("Cannot read image: %v", err)
  939. }
  940. b64img := base64.StdEncoding.EncodeToString(image)
  941. //TODO test bytea type conversion to string or else
  942. var tests = []struct {
  943. stmt *ast.StreamStmt
  944. data []byte
  945. isBinary bool
  946. result interface{}
  947. }{
  948. {
  949. stmt: &ast.StreamStmt{
  950. Name: ast.StreamName("demo"),
  951. StreamFields: nil,
  952. },
  953. data: image,
  954. isBinary: true,
  955. result: &xsql.Tuple{Message: xsql.Message{
  956. "self": image,
  957. },
  958. },
  959. },
  960. {
  961. stmt: &ast.StreamStmt{
  962. Name: ast.StreamName("demo"),
  963. StreamFields: []ast.StreamField{
  964. {Name: "img", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  965. },
  966. },
  967. data: image,
  968. isBinary: true,
  969. result: &xsql.Tuple{Message: xsql.Message{
  970. "img": image,
  971. },
  972. },
  973. },
  974. {
  975. stmt: &ast.StreamStmt{
  976. Name: ast.StreamName("demo"),
  977. StreamFields: []ast.StreamField{
  978. {Name: "a", FieldType: &ast.RecType{
  979. StreamFields: []ast.StreamField{
  980. {Name: "b", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  981. },
  982. }},
  983. },
  984. },
  985. data: []byte(fmt.Sprintf(`{"a": {"b" : "%s"}}`, b64img)),
  986. result: &xsql.Tuple{Message: xsql.Message{
  987. "a": map[string]interface{}{
  988. "b": image,
  989. },
  990. },
  991. },
  992. },
  993. {
  994. stmt: &ast.StreamStmt{
  995. Name: ast.StreamName("demo"),
  996. StreamFields: []ast.StreamField{
  997. {Name: "a", FieldType: &ast.ArrayType{
  998. Type: ast.BYTEA,
  999. }},
  1000. },
  1001. },
  1002. data: []byte(fmt.Sprintf(`{"a": ["%s"]}`, b64img)),
  1003. result: &xsql.Tuple{Message: xsql.Message{
  1004. "a": [][]byte{
  1005. image,
  1006. },
  1007. },
  1008. },
  1009. },
  1010. {
  1011. stmt: &ast.StreamStmt{
  1012. Name: ast.StreamName("demo"),
  1013. StreamFields: []ast.StreamField{
  1014. {Name: "a", FieldType: &ast.ArrayType{
  1015. Type: ast.STRUCT,
  1016. FieldType: &ast.RecType{
  1017. StreamFields: []ast.StreamField{
  1018. {Name: "b", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  1019. },
  1020. },
  1021. }},
  1022. },
  1023. },
  1024. data: []byte(fmt.Sprintf(`{"a": [{"b":"%s"}]}`, b64img)),
  1025. result: &xsql.Tuple{Message: xsql.Message{
  1026. "a": []map[string]interface{}{
  1027. {"b": image},
  1028. },
  1029. },
  1030. },
  1031. },
  1032. }
  1033. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1034. defer conf.CloseLogger()
  1035. contextLogger := conf.Log.WithField("rule", "TestPreprocessorForBinary")
  1036. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1037. for i, tt := range tests {
  1038. pp := &Preprocessor{}
  1039. pp.streamFields = convertFields(tt.stmt.StreamFields)
  1040. pp.isBinary = tt.isBinary
  1041. format := "json"
  1042. if tt.isBinary {
  1043. format = "binary"
  1044. }
  1045. if dm, e := message.Decode(tt.data, format); e != nil {
  1046. log.Fatal(e)
  1047. return
  1048. } else {
  1049. tuple := &xsql.Tuple{Message: dm}
  1050. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  1051. result := pp.Apply(ctx, tuple, fv, afv)
  1052. if !reflect.DeepEqual(tt.result, result) {
  1053. t.Errorf("%d. %q\n\nresult mismatch", i, tuple)
  1054. }
  1055. }
  1056. }
  1057. }