preprocessor_test.go 29 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "encoding/base64"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/internal/topo/context"
  22. "github.com/lf-edge/ekuiper/internal/xsql"
  23. "github.com/lf-edge/ekuiper/pkg/ast"
  24. "github.com/lf-edge/ekuiper/pkg/cast"
  25. "github.com/lf-edge/ekuiper/pkg/message"
  26. "io/ioutil"
  27. "log"
  28. "path"
  29. "reflect"
  30. "testing"
  31. "time"
  32. )
  33. func TestPreprocessor_Apply(t *testing.T) {
  34. var tests = []struct {
  35. stmt *ast.StreamStmt
  36. data []byte
  37. result interface{}
  38. }{
  39. //Basic type
  40. {
  41. stmt: &ast.StreamStmt{
  42. Name: ast.StreamName("demo"),
  43. StreamFields: []ast.StreamField{
  44. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  45. },
  46. },
  47. data: []byte(`{"a": 6}`),
  48. result: errors.New("error in preprocessor: invalid data map[a:%!s(float64=6)], field abc not found"),
  49. },
  50. {
  51. stmt: &ast.StreamStmt{
  52. Name: ast.StreamName("demo"),
  53. StreamFields: []ast.StreamField{
  54. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  55. },
  56. },
  57. data: []byte(`{"abc": null}`),
  58. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found <nil>(<nil>)"),
  59. },
  60. {
  61. stmt: &ast.StreamStmt{
  62. Name: ast.StreamName("demo"),
  63. StreamFields: nil,
  64. },
  65. data: []byte(`{"a": 6}`),
  66. result: &xsql.Tuple{Message: xsql.Message{
  67. "a": float64(6),
  68. },
  69. },
  70. },
  71. {
  72. stmt: &ast.StreamStmt{
  73. Name: ast.StreamName("demo"),
  74. StreamFields: []ast.StreamField{
  75. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  76. },
  77. },
  78. data: []byte(`{"abc": 6}`),
  79. result: &xsql.Tuple{Message: xsql.Message{
  80. "abc": 6,
  81. },
  82. },
  83. },
  84. {
  85. stmt: &ast.StreamStmt{
  86. Name: ast.StreamName("demo"),
  87. StreamFields: nil,
  88. },
  89. data: []byte(`{"abc": 6}`),
  90. result: &xsql.Tuple{Message: xsql.Message{
  91. "abc": float64(6),
  92. },
  93. },
  94. },
  95. {
  96. stmt: &ast.StreamStmt{
  97. Name: ast.StreamName("demo"),
  98. StreamFields: []ast.StreamField{
  99. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  100. {Name: "dEf", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  101. },
  102. },
  103. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  104. result: &xsql.Tuple{Message: xsql.Message{
  105. "abc": float64(34),
  106. "dEf": "hello",
  107. },
  108. },
  109. },
  110. {
  111. stmt: &ast.StreamStmt{
  112. Name: ast.StreamName("demo"),
  113. StreamFields: nil,
  114. },
  115. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  116. result: &xsql.Tuple{Message: xsql.Message{
  117. "abc": float64(34),
  118. "def": "hello",
  119. "ghi": float64(50),
  120. },
  121. },
  122. },
  123. {
  124. stmt: &ast.StreamStmt{
  125. Name: ast.StreamName("demo"),
  126. StreamFields: []ast.StreamField{
  127. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  128. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  129. },
  130. },
  131. data: []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
  132. result: &xsql.Tuple{Message: xsql.Message{
  133. "abc": float64(34),
  134. "def": "hello",
  135. },
  136. },
  137. },
  138. {
  139. stmt: &ast.StreamStmt{
  140. Name: ast.StreamName("demo"),
  141. StreamFields: []ast.StreamField{
  142. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  143. {Name: "def", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  144. },
  145. },
  146. data: []byte(`{"abc": 77, "def" : "hello"}`),
  147. result: errors.New("error in preprocessor: invalid data type for def, expect boolean but found string(hello)"),
  148. },
  149. {
  150. stmt: &ast.StreamStmt{
  151. Name: ast.StreamName("demo"),
  152. StreamFields: []ast.StreamField{
  153. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  154. {Name: "def", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  155. },
  156. },
  157. data: []byte(`{"a": {"b" : "hello"}}`),
  158. result: errors.New("error in preprocessor: invalid data map[a:map[b:hello]], field abc not found"),
  159. },
  160. {
  161. stmt: &ast.StreamStmt{
  162. Name: ast.StreamName("demo"),
  163. StreamFields: nil,
  164. },
  165. data: []byte(`{"a": {"b" : "hello"}}`),
  166. result: &xsql.Tuple{Message: xsql.Message{
  167. "a": map[string]interface{}{
  168. "b": "hello",
  169. },
  170. },
  171. },
  172. },
  173. //Rec type
  174. {
  175. stmt: &ast.StreamStmt{
  176. Name: ast.StreamName("demo"),
  177. StreamFields: []ast.StreamField{
  178. {Name: "a", FieldType: &ast.RecType{
  179. StreamFields: []ast.StreamField{
  180. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  181. },
  182. }},
  183. },
  184. },
  185. data: []byte(`{"a": {"b" : "hello"}}`),
  186. result: &xsql.Tuple{Message: xsql.Message{
  187. "a": map[string]interface{}{
  188. "b": "hello",
  189. },
  190. },
  191. },
  192. },
  193. {
  194. stmt: &ast.StreamStmt{
  195. Name: ast.StreamName("demo"),
  196. StreamFields: []ast.StreamField{
  197. {Name: "a", FieldType: &ast.RecType{
  198. StreamFields: []ast.StreamField{
  199. {Name: "b", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  200. },
  201. }},
  202. },
  203. },
  204. data: []byte(`{"a": "{\"b\" : \"32\"}"}`),
  205. result: &xsql.Tuple{Message: xsql.Message{
  206. "a": map[string]interface{}{
  207. "b": float64(32),
  208. },
  209. },
  210. },
  211. },
  212. {
  213. stmt: &ast.StreamStmt{
  214. Name: ast.StreamName("demo"),
  215. StreamFields: nil,
  216. },
  217. data: []byte(`{"a": {"b" : "32"}}`),
  218. result: &xsql.Tuple{Message: xsql.Message{
  219. "a": map[string]interface{}{
  220. "b": "32",
  221. },
  222. },
  223. },
  224. },
  225. //Array of complex type
  226. {
  227. stmt: &ast.StreamStmt{
  228. Name: ast.StreamName("demo"),
  229. StreamFields: []ast.StreamField{
  230. {Name: "a", FieldType: &ast.ArrayType{
  231. Type: ast.STRUCT,
  232. FieldType: &ast.RecType{
  233. StreamFields: []ast.StreamField{
  234. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  235. },
  236. },
  237. }},
  238. },
  239. },
  240. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  241. result: &xsql.Tuple{Message: xsql.Message{
  242. "a": []map[string]interface{}{
  243. {"b": "hello1"},
  244. {"b": "hello2"},
  245. },
  246. },
  247. },
  248. },
  249. {
  250. stmt: &ast.StreamStmt{
  251. Name: ast.StreamName("demo"),
  252. StreamFields: []ast.StreamField{
  253. {Name: "a", FieldType: &ast.ArrayType{
  254. Type: ast.STRUCT,
  255. FieldType: &ast.RecType{
  256. StreamFields: []ast.StreamField{
  257. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  258. },
  259. },
  260. }},
  261. },
  262. },
  263. data: []byte(`{"a": []}`),
  264. result: &xsql.Tuple{Message: xsql.Message{
  265. "a": make([]map[string]interface{}, 0),
  266. },
  267. },
  268. },
  269. {
  270. stmt: &ast.StreamStmt{
  271. Name: ast.StreamName("demo"),
  272. StreamFields: []ast.StreamField{
  273. {Name: "a", FieldType: &ast.ArrayType{
  274. Type: ast.STRUCT,
  275. FieldType: &ast.RecType{
  276. StreamFields: []ast.StreamField{
  277. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  278. },
  279. },
  280. }},
  281. },
  282. },
  283. data: []byte(`{"a": null}`),
  284. result: &xsql.Tuple{Message: xsql.Message{
  285. "a": []map[string]interface{}(nil),
  286. },
  287. },
  288. },
  289. {
  290. stmt: &ast.StreamStmt{
  291. Name: ast.StreamName("demo"),
  292. StreamFields: []ast.StreamField{
  293. {Name: "a", FieldType: &ast.ArrayType{
  294. Type: ast.STRUCT,
  295. FieldType: &ast.RecType{
  296. StreamFields: []ast.StreamField{
  297. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  298. },
  299. },
  300. }},
  301. },
  302. },
  303. data: []byte(`{"a": [null, {"b" : "hello2"}]}`),
  304. result: &xsql.Tuple{Message: xsql.Message{
  305. "a": []map[string]interface{}{
  306. nil,
  307. {"b": "hello2"},
  308. },
  309. },
  310. },
  311. },
  312. {
  313. stmt: &ast.StreamStmt{
  314. Name: ast.StreamName("demo"),
  315. StreamFields: []ast.StreamField{
  316. {Name: "a", FieldType: &ast.ArrayType{
  317. Type: ast.ARRAY,
  318. FieldType: &ast.ArrayType{
  319. Type: ast.BIGINT,
  320. },
  321. }},
  322. },
  323. },
  324. data: []byte(`{"a": [[50, 60, 70],[66], [77]]}`),
  325. result: &xsql.Tuple{Message: xsql.Message{
  326. "a": [][]int{
  327. {50, 60, 70},
  328. {66},
  329. {77},
  330. },
  331. },
  332. },
  333. },
  334. {
  335. stmt: &ast.StreamStmt{
  336. Name: ast.StreamName("demo"),
  337. StreamFields: []ast.StreamField{
  338. {Name: "a", FieldType: &ast.ArrayType{
  339. Type: ast.ARRAY,
  340. FieldType: &ast.ArrayType{
  341. Type: ast.BIGINT,
  342. },
  343. }},
  344. },
  345. },
  346. data: []byte(`{"a": [null, [66], [77]]}`),
  347. result: &xsql.Tuple{Message: xsql.Message{
  348. "a": [][]int{
  349. []int(nil),
  350. {66},
  351. {77},
  352. },
  353. },
  354. },
  355. },
  356. {
  357. stmt: &ast.StreamStmt{
  358. Name: ast.StreamName("demo"),
  359. StreamFields: nil,
  360. },
  361. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  362. result: &xsql.Tuple{Message: xsql.Message{
  363. "a": []interface{}{
  364. map[string]interface{}{"b": "hello1"},
  365. map[string]interface{}{"b": "hello2"},
  366. },
  367. },
  368. },
  369. },
  370. {
  371. stmt: &ast.StreamStmt{
  372. Name: ast.StreamName("demo"),
  373. StreamFields: []ast.StreamField{
  374. {Name: "a", FieldType: &ast.ArrayType{
  375. Type: ast.FLOAT,
  376. }},
  377. },
  378. },
  379. data: []byte(`{"a": "[\"55\", \"77\"]"}`),
  380. result: &xsql.Tuple{Message: xsql.Message{
  381. "a": []float64{
  382. 55,
  383. 77,
  384. },
  385. },
  386. },
  387. },
  388. {
  389. stmt: &ast.StreamStmt{
  390. Name: ast.StreamName("demo"),
  391. StreamFields: nil,
  392. },
  393. data: []byte(`{"a": [55, 77]}`),
  394. result: &xsql.Tuple{Message: xsql.Message{
  395. "a": []interface{}{
  396. float64(55),
  397. float64(77),
  398. },
  399. },
  400. },
  401. },
  402. //Rec of complex type
  403. {
  404. stmt: &ast.StreamStmt{
  405. Name: ast.StreamName("demo"),
  406. StreamFields: []ast.StreamField{
  407. {Name: "a", FieldType: &ast.RecType{
  408. StreamFields: []ast.StreamField{
  409. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  410. {Name: "c", FieldType: &ast.RecType{
  411. StreamFields: []ast.StreamField{
  412. {Name: "d", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  413. },
  414. }},
  415. },
  416. }},
  417. },
  418. },
  419. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  420. result: &xsql.Tuple{Message: xsql.Message{
  421. "a": map[string]interface{}{
  422. "b": "hello",
  423. "c": map[string]interface{}{
  424. "d": 35,
  425. },
  426. },
  427. },
  428. },
  429. },
  430. {
  431. stmt: &ast.StreamStmt{
  432. Name: ast.StreamName("demo"),
  433. StreamFields: []ast.StreamField{
  434. {Name: "a", FieldType: &ast.RecType{
  435. StreamFields: []ast.StreamField{
  436. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  437. {Name: "c", FieldType: &ast.RecType{
  438. StreamFields: []ast.StreamField{
  439. {Name: "d", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  440. },
  441. }},
  442. },
  443. }},
  444. },
  445. },
  446. data: []byte(`{"a": null}`),
  447. result: &xsql.Tuple{Message: xsql.Message{
  448. "a": map[string]interface{}(nil),
  449. },
  450. },
  451. },
  452. {
  453. stmt: &ast.StreamStmt{
  454. Name: ast.StreamName("demo"),
  455. StreamFields: []ast.StreamField{
  456. {Name: "a", FieldType: &ast.RecType{
  457. StreamFields: []ast.StreamField{
  458. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  459. {Name: "c", FieldType: &ast.ArrayType{
  460. Type: ast.FLOAT,
  461. }},
  462. },
  463. }},
  464. },
  465. },
  466. data: []byte(`{"a": {"b" : "hello", "c": [35.2, 38.2]}}`),
  467. result: &xsql.Tuple{Message: xsql.Message{
  468. "a": map[string]interface{}{
  469. "b": "hello",
  470. "c": []float64{
  471. 35.2, 38.2,
  472. },
  473. },
  474. },
  475. },
  476. },
  477. {
  478. stmt: &ast.StreamStmt{
  479. Name: ast.StreamName("demo"),
  480. StreamFields: []ast.StreamField{
  481. {Name: "a", FieldType: &ast.RecType{
  482. StreamFields: []ast.StreamField{
  483. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  484. {Name: "c", FieldType: &ast.ArrayType{
  485. Type: ast.FLOAT,
  486. }},
  487. },
  488. }},
  489. },
  490. },
  491. data: []byte(`{"a": {"b" : "hello", "c": null}}`),
  492. result: &xsql.Tuple{Message: xsql.Message{
  493. "a": map[string]interface{}{
  494. "b": "hello",
  495. "c": []float64(nil),
  496. },
  497. },
  498. },
  499. },
  500. {
  501. stmt: &ast.StreamStmt{
  502. Name: ast.StreamName("demo"),
  503. StreamFields: []ast.StreamField{
  504. {Name: "a", FieldType: &ast.RecType{
  505. StreamFields: []ast.StreamField{
  506. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  507. {Name: "c", FieldType: &ast.ArrayType{
  508. Type: ast.FLOAT,
  509. }},
  510. },
  511. }},
  512. },
  513. },
  514. data: []byte(`{"a": {"b" : "hello", "c": [null, 35.4]}}`),
  515. result: errors.New("error in preprocessor: fail to parse field c: invalid data type for [0], expect float but found <nil>(<nil>)"),
  516. },
  517. {
  518. stmt: &ast.StreamStmt{
  519. Name: ast.StreamName("demo"),
  520. StreamFields: nil,
  521. },
  522. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  523. result: &xsql.Tuple{Message: xsql.Message{
  524. "a": map[string]interface{}{
  525. "b": "hello",
  526. "c": map[string]interface{}{
  527. "d": 35.2,
  528. },
  529. },
  530. },
  531. },
  532. }, {
  533. stmt: &ast.StreamStmt{
  534. Name: ast.StreamName("demo"),
  535. StreamFields: []ast.StreamField{
  536. {Name: "a", FieldType: &ast.RecType{
  537. StreamFields: []ast.StreamField{
  538. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  539. },
  540. }},
  541. {Name: "b", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  542. {Name: "c", FieldType: &ast.ArrayType{Type: ast.BIGINT}},
  543. },
  544. Options: &ast.Options{
  545. STRICT_VALIDATION: false,
  546. },
  547. },
  548. data: []byte(`{"a": {"d" : "hello"}}`),
  549. result: &xsql.Tuple{Message: xsql.Message{
  550. "a": map[string]interface{}{
  551. "b": "",
  552. },
  553. "b": 0.0,
  554. "c": []int(nil),
  555. },
  556. },
  557. },
  558. }
  559. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  560. defer conf.CloseLogger()
  561. contextLogger := conf.Log.WithField("rule", "TestPreprocessor_Apply")
  562. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  563. for i, tt := range tests {
  564. pp := &Preprocessor{}
  565. if tt.stmt.Options != nil {
  566. pp.strictValidation = tt.stmt.Options.STRICT_VALIDATION
  567. } else {
  568. pp.strictValidation = true
  569. }
  570. pp.streamFields = convertFields(tt.stmt.StreamFields)
  571. dm := make(map[string]interface{})
  572. if e := json.Unmarshal(tt.data, &dm); e != nil {
  573. log.Fatal(e)
  574. return
  575. } else {
  576. tuple := &xsql.Tuple{Message: dm}
  577. fv, afv := xsql.NewFunctionValuersForOp(nil)
  578. result := pp.Apply(ctx, tuple, fv, afv)
  579. if !reflect.DeepEqual(tt.result, result) {
  580. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  581. }
  582. }
  583. }
  584. }
  585. func TestPreprocessorTime_Apply(t *testing.T) {
  586. var tests = []struct {
  587. stmt *ast.StreamStmt
  588. data []byte
  589. result interface{}
  590. }{
  591. {
  592. stmt: &ast.StreamStmt{
  593. Name: ast.StreamName("demo"),
  594. StreamFields: []ast.StreamField{
  595. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  596. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  597. },
  598. },
  599. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  600. result: &xsql.Tuple{Message: xsql.Message{
  601. "abc": cast.TimeFromUnixMilli(1568854515000),
  602. "def": cast.TimeFromUnixMilli(1568854573431),
  603. },
  604. },
  605. },
  606. {
  607. stmt: &ast.StreamStmt{
  608. Name: ast.StreamName("demo"),
  609. StreamFields: nil,
  610. },
  611. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  612. result: &xsql.Tuple{Message: xsql.Message{
  613. "abc": "2019-09-19T00:55:15.000Z",
  614. "def": float64(1568854573431),
  615. },
  616. },
  617. },
  618. {
  619. stmt: &ast.StreamStmt{
  620. Name: ast.StreamName("demo"),
  621. StreamFields: []ast.StreamField{
  622. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  623. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  624. },
  625. },
  626. data: []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
  627. result: errors.New("error in preprocessor: invalid data type for abc, cannot convert to datetime: parsing time \"2019-09-19T00:55:1dd5Z\" as \"2006-01-02T15:04:05.000Z07:00\": cannot parse \"1dd5Z\" as \"05\""),
  628. },
  629. {
  630. stmt: &ast.StreamStmt{
  631. Name: ast.StreamName("demo"),
  632. StreamFields: []ast.StreamField{
  633. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  634. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  635. },
  636. Options: &ast.Options{
  637. DATASOURCE: "users",
  638. FORMAT: "JSON",
  639. KEY: "USERID",
  640. CONF_KEY: "srv1",
  641. TYPE: "MQTT",
  642. TIMESTAMP: "USERID",
  643. TIMESTAMP_FORMAT: "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
  644. },
  645. },
  646. data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
  647. result: &xsql.Tuple{Message: xsql.Message{
  648. "abc": cast.TimeFromUnixMilli(1568894115000),
  649. "def": cast.TimeFromUnixMilli(1568854573431),
  650. }},
  651. },
  652. //Array type
  653. {
  654. stmt: &ast.StreamStmt{
  655. Name: ast.StreamName("demo"),
  656. StreamFields: []ast.StreamField{
  657. {Name: "a", FieldType: &ast.ArrayType{
  658. Type: ast.DATETIME,
  659. }},
  660. },
  661. },
  662. data: []byte(`{"a": [1568854515123, 1568854573431]}`),
  663. result: &xsql.Tuple{Message: xsql.Message{
  664. "a": []time.Time{
  665. cast.TimeFromUnixMilli(1568854515123),
  666. cast.TimeFromUnixMilli(1568854573431),
  667. },
  668. },
  669. },
  670. },
  671. {
  672. stmt: &ast.StreamStmt{
  673. Name: ast.StreamName("demo"),
  674. StreamFields: []ast.StreamField{
  675. {Name: "a", FieldType: &ast.RecType{
  676. StreamFields: []ast.StreamField{
  677. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  678. {Name: "c", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  679. },
  680. }},
  681. },
  682. },
  683. data: []byte(`{"a": {"b" : "hello", "c": 1568854515000}}`),
  684. result: &xsql.Tuple{Message: xsql.Message{
  685. "a": map[string]interface{}{
  686. "b": "hello",
  687. "c": cast.TimeFromUnixMilli(1568854515000),
  688. },
  689. },
  690. },
  691. },
  692. }
  693. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  694. defer conf.CloseLogger()
  695. contextLogger := conf.Log.WithField("rule", "TestPreprocessorTime_Apply")
  696. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  697. for i, tt := range tests {
  698. pp := &Preprocessor{}
  699. pp.streamFields = convertFields(tt.stmt.StreamFields)
  700. if tt.stmt.Options != nil {
  701. pp.timestampFormat = tt.stmt.Options.TIMESTAMP_FORMAT
  702. }
  703. dm := make(map[string]interface{})
  704. if e := json.Unmarshal(tt.data, &dm); e != nil {
  705. log.Fatal(e)
  706. return
  707. } else {
  708. tuple := &xsql.Tuple{Message: dm}
  709. fv, afv := xsql.NewFunctionValuersForOp(nil)
  710. result := pp.Apply(ctx, tuple, fv, afv)
  711. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  712. if rt, ok := result.(*xsql.Tuple); ok {
  713. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  714. rt.Message["abc"] = rtt.UTC()
  715. }
  716. }
  717. if !reflect.DeepEqual(tt.result, result) {
  718. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  719. }
  720. }
  721. }
  722. }
  723. func convertFields(o ast.StreamFields) []interface{} {
  724. if o == nil {
  725. return nil
  726. }
  727. fields := make([]interface{}, len(o))
  728. for i := range o {
  729. fields[i] = &o[i]
  730. }
  731. return fields
  732. }
  733. func TestPreprocessorEventtime_Apply(t *testing.T) {
  734. var tests = []struct {
  735. stmt *ast.StreamStmt
  736. data []byte
  737. result interface{}
  738. }{
  739. //Basic type
  740. {
  741. stmt: &ast.StreamStmt{
  742. Name: ast.StreamName("demo"),
  743. StreamFields: []ast.StreamField{
  744. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  745. },
  746. Options: &ast.Options{
  747. DATASOURCE: "users",
  748. FORMAT: "JSON",
  749. KEY: "USERID",
  750. CONF_KEY: "srv1",
  751. TYPE: "MQTT",
  752. TIMESTAMP: "abc",
  753. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  754. },
  755. },
  756. data: []byte(`{"abc": 1568854515000}`),
  757. result: &xsql.Tuple{Message: xsql.Message{
  758. "abc": 1568854515000,
  759. }, Timestamp: 1568854515000,
  760. },
  761. },
  762. {
  763. stmt: &ast.StreamStmt{
  764. Name: ast.StreamName("demo"),
  765. StreamFields: nil,
  766. Options: &ast.Options{
  767. DATASOURCE: "users",
  768. FORMAT: "JSON",
  769. KEY: "USERID",
  770. CONF_KEY: "srv1",
  771. TYPE: "MQTT",
  772. TIMESTAMP: "abc",
  773. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  774. },
  775. },
  776. data: []byte(`{"abc": 1568854515000}`),
  777. result: &xsql.Tuple{Message: xsql.Message{
  778. "abc": float64(1568854515000),
  779. }, Timestamp: 1568854515000,
  780. },
  781. },
  782. {
  783. stmt: &ast.StreamStmt{
  784. Name: ast.StreamName("demo"),
  785. StreamFields: []ast.StreamField{
  786. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  787. },
  788. Options: &ast.Options{
  789. DATASOURCE: "users",
  790. TIMESTAMP: "abc",
  791. },
  792. },
  793. data: []byte(`{"abc": true}`),
  794. result: errors.New("cannot convert timestamp field abc to timestamp with error unsupported type to convert to timestamp true"),
  795. },
  796. {
  797. stmt: &ast.StreamStmt{
  798. Name: ast.StreamName("demo"),
  799. StreamFields: []ast.StreamField{
  800. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  801. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  802. },
  803. Options: &ast.Options{
  804. DATASOURCE: "users",
  805. TIMESTAMP: "def",
  806. },
  807. },
  808. data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
  809. result: &xsql.Tuple{Message: xsql.Message{
  810. "abc": float64(34),
  811. "def": "2019-09-23T02:47:29.754Z",
  812. }, Timestamp: int64(1569206849754),
  813. },
  814. },
  815. {
  816. stmt: &ast.StreamStmt{
  817. Name: ast.StreamName("demo"),
  818. StreamFields: []ast.StreamField{
  819. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  820. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  821. },
  822. Options: &ast.Options{
  823. DATASOURCE: "users",
  824. TIMESTAMP: "abc",
  825. },
  826. },
  827. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  828. result: &xsql.Tuple{Message: xsql.Message{
  829. "abc": cast.TimeFromUnixMilli(1568854515000),
  830. "def": cast.TimeFromUnixMilli(1568854573431),
  831. }, Timestamp: int64(1568854515000),
  832. },
  833. },
  834. {
  835. stmt: &ast.StreamStmt{
  836. Name: ast.StreamName("demo"),
  837. StreamFields: []ast.StreamField{
  838. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  839. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  840. },
  841. Options: &ast.Options{
  842. DATASOURCE: "users",
  843. TIMESTAMP: "def",
  844. TIMESTAMP_FORMAT: "yyyy-MM-dd'AT'HH:mm:ss",
  845. },
  846. },
  847. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  848. result: &xsql.Tuple{Message: xsql.Message{
  849. "abc": float64(34),
  850. "def": "2019-09-23AT02:47:29",
  851. }, Timestamp: int64(1569206849000),
  852. },
  853. },
  854. {
  855. stmt: &ast.StreamStmt{
  856. Name: ast.StreamName("demo"),
  857. StreamFields: []ast.StreamField{
  858. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  859. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  860. },
  861. Options: &ast.Options{
  862. DATASOURCE: "users",
  863. TIMESTAMP: "def",
  864. TIMESTAMP_FORMAT: "yyyy-MM-ddaHH:mm:ss",
  865. },
  866. },
  867. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  868. result: errors.New("cannot convert timestamp field def to timestamp with error parsing time \"2019-09-23AT02:47:29\" as \"2006-01-02PM15:04:05\": cannot parse \"02:47:29\" as \"PM\""),
  869. },
  870. }
  871. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  872. defer conf.CloseLogger()
  873. contextLogger := conf.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
  874. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  875. for i, tt := range tests {
  876. pp := &Preprocessor{
  877. defaultFieldProcessor: defaultFieldProcessor{
  878. streamFields: convertFields(tt.stmt.StreamFields),
  879. isBinary: false,
  880. timestampFormat: tt.stmt.Options.TIMESTAMP_FORMAT,
  881. },
  882. isEventTime: true,
  883. timestampField: tt.stmt.Options.TIMESTAMP,
  884. }
  885. dm := make(map[string]interface{})
  886. if e := json.Unmarshal(tt.data, &dm); e != nil {
  887. log.Fatal(e)
  888. return
  889. } else {
  890. tuple := &xsql.Tuple{Message: dm}
  891. fv, afv := xsql.NewFunctionValuersForOp(nil)
  892. result := pp.Apply(ctx, tuple, fv, afv)
  893. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  894. if rt, ok := result.(*xsql.Tuple); ok {
  895. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  896. rt.Message["abc"] = rtt.UTC()
  897. }
  898. }
  899. if !reflect.DeepEqual(tt.result, result) {
  900. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  901. }
  902. }
  903. }
  904. }
  905. func TestPreprocessorError(t *testing.T) {
  906. tests := []struct {
  907. stmt *ast.StreamStmt
  908. data []byte
  909. result interface{}
  910. }{
  911. {
  912. stmt: &ast.StreamStmt{
  913. Name: ast.StreamName("demo"),
  914. StreamFields: []ast.StreamField{
  915. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  916. },
  917. },
  918. data: []byte(`{"abc": "dafsad"}`),
  919. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(dafsad)"),
  920. }, {
  921. stmt: &ast.StreamStmt{
  922. Name: ast.StreamName("demo"),
  923. StreamFields: []ast.StreamField{
  924. {Name: "a", FieldType: &ast.RecType{
  925. StreamFields: []ast.StreamField{
  926. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  927. },
  928. }},
  929. },
  930. },
  931. data: []byte(`{"a": {"d" : "hello"}}`),
  932. result: errors.New("error in preprocessor: invalid data map[d:hello], field b not found"),
  933. }, {
  934. stmt: &ast.StreamStmt{
  935. Name: ast.StreamName("demo"),
  936. StreamFields: []ast.StreamField{
  937. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  938. },
  939. Options: &ast.Options{
  940. DATASOURCE: "users",
  941. FORMAT: "JSON",
  942. KEY: "USERID",
  943. CONF_KEY: "srv1",
  944. TYPE: "MQTT",
  945. TIMESTAMP: "abc",
  946. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  947. },
  948. },
  949. data: []byte(`{"abc": "not a time"}`),
  950. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(not a time)"),
  951. },
  952. }
  953. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  954. defer conf.CloseLogger()
  955. contextLogger := conf.Log.WithField("rule", "TestPreprocessorError")
  956. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  957. for i, tt := range tests {
  958. pp := &Preprocessor{}
  959. pp.strictValidation = true
  960. pp.streamFields = convertFields(tt.stmt.StreamFields)
  961. dm := make(map[string]interface{})
  962. if e := json.Unmarshal(tt.data, &dm); e != nil {
  963. log.Fatal(e)
  964. return
  965. } else {
  966. tuple := &xsql.Tuple{Message: dm}
  967. fv, afv := xsql.NewFunctionValuersForOp(nil)
  968. result := pp.Apply(ctx, tuple, fv, afv)
  969. if !reflect.DeepEqual(tt.result, result) {
  970. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  971. }
  972. }
  973. }
  974. }
  975. func TestPreprocessorForBinary(t *testing.T) {
  976. docsFolder, err := conf.GetLoc("docs/")
  977. if err != nil {
  978. t.Errorf("Cannot find docs folder: %v", err)
  979. }
  980. image, err := ioutil.ReadFile(path.Join(docsFolder, "cover.jpg"))
  981. if err != nil {
  982. t.Errorf("Cannot read image: %v", err)
  983. }
  984. b64img := base64.StdEncoding.EncodeToString(image)
  985. //TODO test bytea type conversion to string or else
  986. var tests = []struct {
  987. stmt *ast.StreamStmt
  988. data []byte
  989. isBinary bool
  990. result interface{}
  991. }{
  992. {
  993. stmt: &ast.StreamStmt{
  994. Name: ast.StreamName("demo"),
  995. StreamFields: nil,
  996. },
  997. data: image,
  998. isBinary: true,
  999. result: &xsql.Tuple{Message: xsql.Message{
  1000. "self": image,
  1001. },
  1002. },
  1003. },
  1004. {
  1005. stmt: &ast.StreamStmt{
  1006. Name: ast.StreamName("demo"),
  1007. StreamFields: []ast.StreamField{
  1008. {Name: "img", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  1009. },
  1010. },
  1011. data: image,
  1012. isBinary: true,
  1013. result: &xsql.Tuple{Message: xsql.Message{
  1014. "img": image,
  1015. },
  1016. },
  1017. },
  1018. {
  1019. stmt: &ast.StreamStmt{
  1020. Name: ast.StreamName("demo"),
  1021. StreamFields: []ast.StreamField{
  1022. {Name: "a", FieldType: &ast.RecType{
  1023. StreamFields: []ast.StreamField{
  1024. {Name: "b", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  1025. },
  1026. }},
  1027. },
  1028. },
  1029. data: []byte(fmt.Sprintf(`{"a": {"b" : "%s"}}`, b64img)),
  1030. result: &xsql.Tuple{Message: xsql.Message{
  1031. "a": map[string]interface{}{
  1032. "b": image,
  1033. },
  1034. },
  1035. },
  1036. },
  1037. {
  1038. stmt: &ast.StreamStmt{
  1039. Name: ast.StreamName("demo"),
  1040. StreamFields: []ast.StreamField{
  1041. {Name: "a", FieldType: &ast.ArrayType{
  1042. Type: ast.BYTEA,
  1043. }},
  1044. },
  1045. },
  1046. data: []byte(fmt.Sprintf(`{"a": ["%s"]}`, b64img)),
  1047. result: &xsql.Tuple{Message: xsql.Message{
  1048. "a": [][]byte{
  1049. image,
  1050. },
  1051. },
  1052. },
  1053. },
  1054. {
  1055. stmt: &ast.StreamStmt{
  1056. Name: ast.StreamName("demo"),
  1057. StreamFields: []ast.StreamField{
  1058. {Name: "a", FieldType: &ast.ArrayType{
  1059. Type: ast.STRUCT,
  1060. FieldType: &ast.RecType{
  1061. StreamFields: []ast.StreamField{
  1062. {Name: "b", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  1063. },
  1064. },
  1065. }},
  1066. },
  1067. },
  1068. data: []byte(fmt.Sprintf(`{"a": [{"b":"%s"}]}`, b64img)),
  1069. result: &xsql.Tuple{Message: xsql.Message{
  1070. "a": []map[string]interface{}{
  1071. {"b": image},
  1072. },
  1073. },
  1074. },
  1075. },
  1076. }
  1077. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1078. defer conf.CloseLogger()
  1079. contextLogger := conf.Log.WithField("rule", "TestPreprocessorForBinary")
  1080. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1081. for i, tt := range tests {
  1082. pp := &Preprocessor{}
  1083. pp.streamFields = convertFields(tt.stmt.StreamFields)
  1084. pp.isBinary = tt.isBinary
  1085. format := "json"
  1086. if tt.isBinary {
  1087. format = "binary"
  1088. }
  1089. if dm, e := message.Decode(tt.data, format); e != nil {
  1090. log.Fatal(e)
  1091. return
  1092. } else {
  1093. tuple := &xsql.Tuple{Message: dm}
  1094. fv, afv := xsql.NewFunctionValuersForOp(nil)
  1095. result := pp.Apply(ctx, tuple, fv, afv)
  1096. if !reflect.DeepEqual(tt.result, result) {
  1097. t.Errorf("%d. %q\n\nresult mismatch", i, tuple)
  1098. }
  1099. }
  1100. }
  1101. }