preprocessor_test.go 29 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "encoding/base64"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "log"
  21. "os"
  22. "path"
  23. "reflect"
  24. "testing"
  25. "time"
  26. "github.com/lf-edge/ekuiper/internal/conf"
  27. "github.com/lf-edge/ekuiper/internal/converter"
  28. "github.com/lf-edge/ekuiper/internal/topo/context"
  29. "github.com/lf-edge/ekuiper/internal/xsql"
  30. "github.com/lf-edge/ekuiper/pkg/ast"
  31. "github.com/lf-edge/ekuiper/pkg/cast"
  32. "github.com/lf-edge/ekuiper/pkg/message"
  33. )
  34. func TestPreprocessor_Apply(t *testing.T) {
  35. var tests = []struct {
  36. stmt *ast.StreamStmt
  37. data []byte
  38. result interface{}
  39. }{
  40. //Basic type
  41. { // 0
  42. stmt: &ast.StreamStmt{
  43. Name: ast.StreamName("demo"),
  44. StreamFields: []ast.StreamField{
  45. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  46. },
  47. },
  48. data: []byte(`{"a": 6}`),
  49. result: errors.New("error in preprocessor: field abc is not found"),
  50. },
  51. { // 1
  52. stmt: &ast.StreamStmt{
  53. Name: ast.StreamName("demo"),
  54. StreamFields: []ast.StreamField{
  55. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  56. },
  57. },
  58. data: []byte(`{"abc": null}`),
  59. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert <nil>(<nil>) to int64"),
  60. },
  61. { // 2
  62. stmt: &ast.StreamStmt{
  63. Name: ast.StreamName("demo"),
  64. StreamFields: nil,
  65. },
  66. data: []byte(`{"a": 6}`),
  67. result: &xsql.Tuple{Message: xsql.Message{
  68. "a": float64(6),
  69. },
  70. },
  71. },
  72. { // 3
  73. stmt: &ast.StreamStmt{
  74. Name: ast.StreamName("demo"),
  75. StreamFields: []ast.StreamField{
  76. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  77. },
  78. },
  79. data: []byte(`{"abc": 6}`),
  80. result: &xsql.Tuple{Message: xsql.Message{
  81. "abc": int64(6),
  82. },
  83. },
  84. },
  85. { // 4
  86. stmt: &ast.StreamStmt{
  87. Name: ast.StreamName("demo"),
  88. StreamFields: nil,
  89. },
  90. data: []byte(`{"abc": 6}`),
  91. result: &xsql.Tuple{Message: xsql.Message{
  92. "abc": float64(6),
  93. },
  94. },
  95. },
  96. { // 5
  97. stmt: &ast.StreamStmt{
  98. Name: ast.StreamName("demo"),
  99. StreamFields: []ast.StreamField{
  100. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  101. {Name: "dEf", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  102. },
  103. },
  104. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  105. result: &xsql.Tuple{Message: xsql.Message{
  106. "abc": float64(34),
  107. "dEf": "hello",
  108. "def": "hello",
  109. "ghi": float64(50),
  110. },
  111. },
  112. },
  113. { // 6
  114. stmt: &ast.StreamStmt{
  115. Name: ast.StreamName("demo"),
  116. StreamFields: nil,
  117. },
  118. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  119. result: &xsql.Tuple{Message: xsql.Message{
  120. "abc": float64(34),
  121. "def": "hello",
  122. "ghi": float64(50),
  123. },
  124. },
  125. },
  126. { // 7
  127. stmt: &ast.StreamStmt{
  128. Name: ast.StreamName("demo"),
  129. StreamFields: []ast.StreamField{
  130. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  131. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  132. },
  133. },
  134. data: []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
  135. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert string(34) to float64"),
  136. },
  137. { // 8
  138. stmt: &ast.StreamStmt{
  139. Name: ast.StreamName("demo"),
  140. StreamFields: []ast.StreamField{
  141. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  142. {Name: "def", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  143. },
  144. },
  145. data: []byte(`{"abc": 77, "def" : "hello"}`),
  146. result: errors.New("error in preprocessor: field def type mismatch: cannot convert string(hello) to bool"),
  147. },
  148. { // 9
  149. stmt: &ast.StreamStmt{
  150. Name: ast.StreamName("demo"),
  151. StreamFields: []ast.StreamField{
  152. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  153. },
  154. },
  155. data: []byte(`{"a": {"b" : "hello"}}`),
  156. result: errors.New("error in preprocessor: field abc is not found"),
  157. },
  158. { // 10
  159. stmt: &ast.StreamStmt{
  160. Name: ast.StreamName("demo"),
  161. StreamFields: nil,
  162. },
  163. data: []byte(`{"a": {"b" : "hello"}}`),
  164. result: &xsql.Tuple{Message: xsql.Message{
  165. "a": map[string]interface{}{
  166. "b": "hello",
  167. },
  168. },
  169. },
  170. },
  171. //Rec type
  172. { // 11
  173. stmt: &ast.StreamStmt{
  174. Name: ast.StreamName("demo"),
  175. StreamFields: []ast.StreamField{
  176. {Name: "a", FieldType: &ast.RecType{
  177. StreamFields: []ast.StreamField{
  178. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  179. },
  180. }},
  181. },
  182. },
  183. data: []byte(`{"a": {"b" : "hello"}}`),
  184. result: &xsql.Tuple{Message: xsql.Message{
  185. "a": map[string]interface{}{
  186. "b": "hello",
  187. },
  188. },
  189. },
  190. },
  191. { // 12
  192. stmt: &ast.StreamStmt{
  193. Name: ast.StreamName("demo"),
  194. StreamFields: []ast.StreamField{
  195. {Name: "a", FieldType: &ast.RecType{
  196. StreamFields: []ast.StreamField{
  197. {Name: "b", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  198. },
  199. }},
  200. },
  201. },
  202. data: []byte(`{"a": "{\"b\" : \"32\"}"}`),
  203. result: errors.New("error in preprocessor: field a type mismatch: field b type mismatch: cannot convert string(32) to float64"),
  204. },
  205. { // 13
  206. stmt: &ast.StreamStmt{
  207. Name: ast.StreamName("demo"),
  208. StreamFields: nil,
  209. },
  210. data: []byte(`{"a": {"b" : "32"}}`),
  211. result: &xsql.Tuple{Message: xsql.Message{
  212. "a": map[string]interface{}{
  213. "b": "32",
  214. },
  215. },
  216. },
  217. },
  218. //Array of complex type
  219. { // 14
  220. stmt: &ast.StreamStmt{
  221. Name: ast.StreamName("demo"),
  222. StreamFields: []ast.StreamField{
  223. {Name: "a", FieldType: &ast.ArrayType{
  224. Type: ast.STRUCT,
  225. FieldType: &ast.RecType{
  226. StreamFields: []ast.StreamField{
  227. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  228. },
  229. },
  230. }},
  231. },
  232. },
  233. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  234. result: &xsql.Tuple{Message: xsql.Message{
  235. "a": []interface{}{
  236. map[string]interface{}{"b": "hello1"},
  237. map[string]interface{}{"b": "hello2"},
  238. },
  239. },
  240. },
  241. },
  242. { // 15
  243. stmt: &ast.StreamStmt{
  244. Name: ast.StreamName("demo"),
  245. StreamFields: []ast.StreamField{
  246. {Name: "a", FieldType: &ast.ArrayType{
  247. Type: ast.STRUCT,
  248. FieldType: &ast.RecType{
  249. StreamFields: []ast.StreamField{
  250. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  251. },
  252. },
  253. }},
  254. },
  255. },
  256. data: []byte(`{"a": []}`),
  257. result: &xsql.Tuple{Message: xsql.Message{
  258. "a": make([]interface{}, 0),
  259. },
  260. },
  261. },
  262. { // 16
  263. stmt: &ast.StreamStmt{
  264. Name: ast.StreamName("demo"),
  265. StreamFields: []ast.StreamField{
  266. {Name: "a", FieldType: &ast.ArrayType{
  267. Type: ast.STRUCT,
  268. FieldType: &ast.RecType{
  269. StreamFields: []ast.StreamField{
  270. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  271. },
  272. },
  273. }},
  274. },
  275. },
  276. data: []byte(`{"a": null}`),
  277. result: &xsql.Tuple{Message: xsql.Message{
  278. "a": []interface{}(nil),
  279. },
  280. },
  281. },
  282. { // 17
  283. stmt: &ast.StreamStmt{
  284. Name: ast.StreamName("demo"),
  285. StreamFields: []ast.StreamField{
  286. {Name: "a", FieldType: &ast.ArrayType{
  287. Type: ast.STRUCT,
  288. FieldType: &ast.RecType{
  289. StreamFields: []ast.StreamField{
  290. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  291. },
  292. },
  293. }},
  294. },
  295. },
  296. data: []byte(`{"a": [null, {"b" : "hello2"}]}`),
  297. result: &xsql.Tuple{Message: xsql.Message{
  298. "a": []interface{}{
  299. map[string]interface{}(nil),
  300. map[string]interface{}{"b": "hello2"},
  301. },
  302. },
  303. },
  304. },
  305. { // 18
  306. stmt: &ast.StreamStmt{
  307. Name: ast.StreamName("demo"),
  308. StreamFields: []ast.StreamField{
  309. {Name: "a", FieldType: &ast.ArrayType{
  310. Type: ast.ARRAY,
  311. FieldType: &ast.ArrayType{
  312. Type: ast.BIGINT,
  313. },
  314. }},
  315. },
  316. },
  317. data: []byte(`{"a": [[50, 60, 70],[66], [77]]}`),
  318. result: &xsql.Tuple{Message: xsql.Message{
  319. "a": []interface{}{
  320. []interface{}{int64(50), int64(60), int64(70)},
  321. []interface{}{int64(66)},
  322. []interface{}{int64(77)},
  323. },
  324. },
  325. },
  326. },
  327. { // 19
  328. stmt: &ast.StreamStmt{
  329. Name: ast.StreamName("demo"),
  330. StreamFields: []ast.StreamField{
  331. {Name: "a", FieldType: &ast.ArrayType{
  332. Type: ast.ARRAY,
  333. FieldType: &ast.ArrayType{
  334. Type: ast.BIGINT,
  335. },
  336. }},
  337. },
  338. },
  339. data: []byte(`{"a": [null, [66], [77]]}`),
  340. result: &xsql.Tuple{Message: xsql.Message{
  341. "a": []interface{}{
  342. []interface{}(nil),
  343. []interface{}{int64(66)},
  344. []interface{}{int64(77)},
  345. },
  346. },
  347. },
  348. },
  349. { // 20
  350. stmt: &ast.StreamStmt{
  351. Name: ast.StreamName("demo"),
  352. StreamFields: nil,
  353. },
  354. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  355. result: &xsql.Tuple{Message: xsql.Message{
  356. "a": []interface{}{
  357. map[string]interface{}{"b": "hello1"},
  358. map[string]interface{}{"b": "hello2"},
  359. },
  360. },
  361. },
  362. },
  363. { // 21
  364. stmt: &ast.StreamStmt{
  365. Name: ast.StreamName("demo"),
  366. StreamFields: []ast.StreamField{
  367. {Name: "a", FieldType: &ast.ArrayType{
  368. Type: ast.FLOAT,
  369. }},
  370. },
  371. },
  372. data: []byte(`{"a": "[\"55\", \"77\"]"}`),
  373. result: errors.New("error in preprocessor: field a type mismatch: expect array but got [\"55\", \"77\"]"),
  374. },
  375. { // 22
  376. stmt: &ast.StreamStmt{
  377. Name: ast.StreamName("demo"),
  378. StreamFields: nil,
  379. },
  380. data: []byte(`{"a": [55, 77]}`),
  381. result: &xsql.Tuple{Message: xsql.Message{
  382. "a": []interface{}{
  383. float64(55),
  384. float64(77),
  385. },
  386. },
  387. },
  388. },
  389. //Rec of complex type
  390. { // 23
  391. stmt: &ast.StreamStmt{
  392. Name: ast.StreamName("demo"),
  393. StreamFields: []ast.StreamField{
  394. {Name: "a", FieldType: &ast.RecType{
  395. StreamFields: []ast.StreamField{
  396. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  397. {Name: "c", FieldType: &ast.RecType{
  398. StreamFields: []ast.StreamField{
  399. {Name: "d", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  400. },
  401. }},
  402. },
  403. }},
  404. },
  405. },
  406. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  407. result: &xsql.Tuple{Message: xsql.Message{
  408. "a": map[string]interface{}{
  409. "b": "hello",
  410. "c": map[string]interface{}{
  411. "d": int64(35),
  412. },
  413. },
  414. },
  415. },
  416. },
  417. { // 24
  418. stmt: &ast.StreamStmt{
  419. Name: ast.StreamName("demo"),
  420. StreamFields: []ast.StreamField{
  421. {Name: "a", FieldType: &ast.RecType{
  422. StreamFields: []ast.StreamField{
  423. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  424. {Name: "c", FieldType: &ast.RecType{
  425. StreamFields: []ast.StreamField{
  426. {Name: "d", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  427. },
  428. }},
  429. },
  430. }},
  431. },
  432. },
  433. data: []byte(`{"a": null}`),
  434. result: &xsql.Tuple{Message: xsql.Message{
  435. "a": map[string]interface{}(nil),
  436. },
  437. },
  438. },
  439. { // 25
  440. stmt: &ast.StreamStmt{
  441. Name: ast.StreamName("demo"),
  442. StreamFields: []ast.StreamField{
  443. {Name: "a", FieldType: &ast.RecType{
  444. StreamFields: []ast.StreamField{
  445. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  446. {Name: "c", FieldType: &ast.ArrayType{
  447. Type: ast.FLOAT,
  448. }},
  449. },
  450. }},
  451. },
  452. },
  453. data: []byte(`{"a": {"b" : "hello", "c": [35.2, 38.2]}}`),
  454. result: &xsql.Tuple{Message: xsql.Message{
  455. "a": map[string]interface{}{
  456. "b": "hello",
  457. "c": []interface{}{
  458. 35.2, 38.2,
  459. },
  460. },
  461. },
  462. },
  463. },
  464. { // 26
  465. stmt: &ast.StreamStmt{
  466. Name: ast.StreamName("demo"),
  467. StreamFields: []ast.StreamField{
  468. {Name: "a", FieldType: &ast.RecType{
  469. StreamFields: []ast.StreamField{
  470. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  471. {Name: "c", FieldType: &ast.ArrayType{
  472. Type: ast.FLOAT,
  473. }},
  474. },
  475. }},
  476. },
  477. },
  478. data: []byte(`{"a": {"b" : "hello", "c": null}}`),
  479. result: &xsql.Tuple{Message: xsql.Message{
  480. "a": map[string]interface{}{
  481. "b": "hello",
  482. "c": []interface{}(nil),
  483. },
  484. },
  485. },
  486. },
  487. { // 27
  488. stmt: &ast.StreamStmt{
  489. Name: ast.StreamName("demo"),
  490. StreamFields: []ast.StreamField{
  491. {Name: "a", FieldType: &ast.RecType{
  492. StreamFields: []ast.StreamField{
  493. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  494. {Name: "c", FieldType: &ast.ArrayType{
  495. Type: ast.FLOAT,
  496. }},
  497. },
  498. }},
  499. },
  500. },
  501. data: []byte(`{"a": {"b" : "hello", "c": [null, 35.4]}}`),
  502. result: errors.New("error in preprocessor: field a type mismatch: field c type mismatch: array element type mismatch: cannot convert <nil>(<nil>) to float64"),
  503. },
  504. { // 28
  505. stmt: &ast.StreamStmt{
  506. Name: ast.StreamName("demo"),
  507. StreamFields: nil,
  508. },
  509. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  510. result: &xsql.Tuple{Message: xsql.Message{
  511. "a": map[string]interface{}{
  512. "b": "hello",
  513. "c": map[string]interface{}{
  514. "d": 35.2,
  515. },
  516. },
  517. },
  518. },
  519. },
  520. }
  521. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  522. defer conf.CloseLogger()
  523. contextLogger := conf.Log.WithField("rule", "TestPreprocessor_Apply")
  524. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  525. for i, tt := range tests {
  526. pp := &Preprocessor{checkSchema: true}
  527. pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
  528. dm := make(map[string]interface{})
  529. if e := json.Unmarshal(tt.data, &dm); e != nil {
  530. log.Fatal(e)
  531. return
  532. } else {
  533. tuple := &xsql.Tuple{Message: dm}
  534. fv, afv := xsql.NewFunctionValuersForOp(nil)
  535. result := pp.Apply(ctx, tuple, fv, afv)
  536. if !reflect.DeepEqual(tt.result, result) {
  537. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  538. }
  539. }
  540. }
  541. }
  542. func TestPreprocessorTime_Apply(t *testing.T) {
  543. var tests = []struct {
  544. stmt *ast.StreamStmt
  545. data []byte
  546. result interface{}
  547. }{
  548. { // 0
  549. stmt: &ast.StreamStmt{
  550. Name: ast.StreamName("demo"),
  551. StreamFields: []ast.StreamField{
  552. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  553. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  554. },
  555. },
  556. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  557. result: &xsql.Tuple{Message: xsql.Message{
  558. "abc": cast.TimeFromUnixMilli(1568854515000),
  559. "def": cast.TimeFromUnixMilli(1568854573431),
  560. },
  561. },
  562. },
  563. { // 1
  564. stmt: &ast.StreamStmt{
  565. Name: ast.StreamName("demo"),
  566. StreamFields: nil,
  567. },
  568. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  569. result: &xsql.Tuple{Message: xsql.Message{
  570. "abc": "2019-09-19T00:55:15.000Z",
  571. "def": float64(1568854573431),
  572. },
  573. },
  574. },
  575. { // 2
  576. stmt: &ast.StreamStmt{
  577. Name: ast.StreamName("demo"),
  578. StreamFields: []ast.StreamField{
  579. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  580. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  581. },
  582. },
  583. data: []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
  584. result: errors.New("error in preprocessor: field abc type mismatch: parsing time \"2019-09-19T00:55:1dd5Z\" as \"2006-01-02T15:04:05.000Z07:00\": cannot parse \"1dd5Z\" as \"05\""),
  585. },
  586. { // 3
  587. stmt: &ast.StreamStmt{
  588. Name: ast.StreamName("demo"),
  589. StreamFields: []ast.StreamField{
  590. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  591. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  592. },
  593. Options: &ast.Options{
  594. DATASOURCE: "users",
  595. FORMAT: "JSON",
  596. KEY: "USERID",
  597. CONF_KEY: "srv1",
  598. TYPE: "MQTT",
  599. TIMESTAMP: "USERID",
  600. TIMESTAMP_FORMAT: "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
  601. },
  602. },
  603. data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
  604. result: &xsql.Tuple{Message: xsql.Message{
  605. "abc": cast.TimeFromUnixMilli(1568894115000),
  606. "def": cast.TimeFromUnixMilli(1568854573431),
  607. }},
  608. },
  609. //Array type
  610. { // 4
  611. stmt: &ast.StreamStmt{
  612. Name: ast.StreamName("demo"),
  613. StreamFields: []ast.StreamField{
  614. {Name: "a", FieldType: &ast.ArrayType{
  615. Type: ast.DATETIME,
  616. }},
  617. },
  618. },
  619. data: []byte(`{"a": [1568854515123, 1568854573431]}`),
  620. result: &xsql.Tuple{Message: xsql.Message{
  621. "a": []interface{}{
  622. cast.TimeFromUnixMilli(1568854515123),
  623. cast.TimeFromUnixMilli(1568854573431),
  624. },
  625. },
  626. },
  627. },
  628. { // 5
  629. stmt: &ast.StreamStmt{
  630. Name: ast.StreamName("demo"),
  631. StreamFields: []ast.StreamField{
  632. {Name: "a", FieldType: &ast.RecType{
  633. StreamFields: []ast.StreamField{
  634. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  635. {Name: "c", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  636. },
  637. }},
  638. },
  639. },
  640. data: []byte(`{"a": {"b" : "hello", "c": 1568854515000}}`),
  641. result: &xsql.Tuple{Message: xsql.Message{
  642. "a": map[string]interface{}{
  643. "b": "hello",
  644. "c": cast.TimeFromUnixMilli(1568854515000),
  645. },
  646. },
  647. },
  648. },
  649. }
  650. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  651. defer conf.CloseLogger()
  652. contextLogger := conf.Log.WithField("rule", "TestPreprocessorTime_Apply")
  653. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  654. for i, tt := range tests {
  655. pp := &Preprocessor{checkSchema: true}
  656. pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
  657. if tt.stmt.Options != nil {
  658. pp.timestampFormat = tt.stmt.Options.TIMESTAMP_FORMAT
  659. }
  660. dm := make(map[string]interface{})
  661. if e := json.Unmarshal(tt.data, &dm); e != nil {
  662. log.Fatal(e)
  663. return
  664. } else {
  665. tuple := &xsql.Tuple{Message: dm}
  666. fv, afv := xsql.NewFunctionValuersForOp(nil)
  667. result := pp.Apply(ctx, tuple, fv, afv)
  668. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  669. if rt, ok := result.(*xsql.Tuple); ok {
  670. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  671. rt.Message["abc"] = rtt.UTC()
  672. }
  673. }
  674. if !reflect.DeepEqual(tt.result, result) {
  675. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  676. }
  677. }
  678. }
  679. }
  680. func convertFields(o ast.StreamFields) []interface{} {
  681. if o == nil {
  682. return nil
  683. }
  684. fields := make([]interface{}, len(o))
  685. for i := range o {
  686. fields[i] = &o[i]
  687. }
  688. return fields
  689. }
  690. func TestPreprocessorEventtime_Apply(t *testing.T) {
  691. var tests = []struct {
  692. stmt *ast.StreamStmt
  693. data []byte
  694. result interface{}
  695. }{
  696. //Basic type
  697. { // 0
  698. stmt: &ast.StreamStmt{
  699. Name: ast.StreamName("demo"),
  700. StreamFields: []ast.StreamField{
  701. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  702. },
  703. Options: &ast.Options{
  704. DATASOURCE: "users",
  705. FORMAT: "JSON",
  706. KEY: "USERID",
  707. CONF_KEY: "srv1",
  708. TYPE: "MQTT",
  709. TIMESTAMP: "abc",
  710. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  711. },
  712. },
  713. data: []byte(`{"abc": 1568854515000}`),
  714. result: &xsql.Tuple{Message: xsql.Message{
  715. "abc": int64(1568854515000),
  716. }, Timestamp: 1568854515000,
  717. },
  718. },
  719. { // 1
  720. stmt: &ast.StreamStmt{
  721. Name: ast.StreamName("demo"),
  722. StreamFields: nil,
  723. Options: &ast.Options{
  724. DATASOURCE: "users",
  725. FORMAT: "JSON",
  726. KEY: "USERID",
  727. CONF_KEY: "srv1",
  728. TYPE: "MQTT",
  729. TIMESTAMP: "abc",
  730. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  731. },
  732. },
  733. data: []byte(`{"abc": 1568854515000}`),
  734. result: &xsql.Tuple{Message: xsql.Message{
  735. "abc": float64(1568854515000),
  736. }, Timestamp: 1568854515000,
  737. },
  738. },
  739. { // 2
  740. stmt: &ast.StreamStmt{
  741. Name: ast.StreamName("demo"),
  742. StreamFields: []ast.StreamField{
  743. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  744. },
  745. Options: &ast.Options{
  746. DATASOURCE: "users",
  747. TIMESTAMP: "abc",
  748. },
  749. },
  750. data: []byte(`{"abc": true}`),
  751. result: errors.New("cannot convert timestamp field abc to timestamp with error unsupported type to convert to timestamp true"),
  752. },
  753. { // 3
  754. stmt: &ast.StreamStmt{
  755. Name: ast.StreamName("demo"),
  756. StreamFields: []ast.StreamField{
  757. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  758. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  759. },
  760. Options: &ast.Options{
  761. DATASOURCE: "users",
  762. TIMESTAMP: "def",
  763. },
  764. },
  765. data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
  766. result: &xsql.Tuple{Message: xsql.Message{
  767. "abc": float64(34),
  768. "def": "2019-09-23T02:47:29.754Z",
  769. "ghi": float64(50),
  770. }, Timestamp: int64(1569206849754),
  771. },
  772. },
  773. { // 4
  774. stmt: &ast.StreamStmt{
  775. Name: ast.StreamName("demo"),
  776. StreamFields: []ast.StreamField{
  777. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  778. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  779. },
  780. Options: &ast.Options{
  781. DATASOURCE: "users",
  782. TIMESTAMP: "abc",
  783. },
  784. },
  785. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  786. result: &xsql.Tuple{Message: xsql.Message{
  787. "abc": cast.TimeFromUnixMilli(1568854515000),
  788. "def": cast.TimeFromUnixMilli(1568854573431),
  789. }, Timestamp: int64(1568854515000),
  790. },
  791. },
  792. { // 5
  793. stmt: &ast.StreamStmt{
  794. Name: ast.StreamName("demo"),
  795. StreamFields: []ast.StreamField{
  796. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  797. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  798. },
  799. Options: &ast.Options{
  800. DATASOURCE: "users",
  801. TIMESTAMP: "def",
  802. TIMESTAMP_FORMAT: "yyyy-MM-dd'AT'HH:mm:ss",
  803. },
  804. },
  805. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  806. result: &xsql.Tuple{Message: xsql.Message{
  807. "abc": float64(34),
  808. "def": "2019-09-23AT02:47:29",
  809. "ghi": float64(50),
  810. }, Timestamp: int64(1569206849000),
  811. },
  812. },
  813. { // 6
  814. stmt: &ast.StreamStmt{
  815. Name: ast.StreamName("demo"),
  816. StreamFields: []ast.StreamField{
  817. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  818. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  819. },
  820. Options: &ast.Options{
  821. DATASOURCE: "users",
  822. TIMESTAMP: "def",
  823. TIMESTAMP_FORMAT: "yyyy-MM-ddaHH:mm:ss",
  824. },
  825. },
  826. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  827. result: errors.New("cannot convert timestamp field def to timestamp with error parsing time \"2019-09-23AT02:47:29\" as \"2006-01-02PM15:04:05\": cannot parse \"AT02:47:29\" as \"PM\""),
  828. },
  829. }
  830. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  831. defer conf.CloseLogger()
  832. contextLogger := conf.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
  833. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  834. for i, tt := range tests {
  835. pp := &Preprocessor{
  836. checkSchema: true,
  837. defaultFieldProcessor: defaultFieldProcessor{
  838. streamFields: tt.stmt.StreamFields.ToJsonSchema(),
  839. timestampFormat: tt.stmt.Options.TIMESTAMP_FORMAT,
  840. },
  841. isEventTime: true,
  842. timestampField: tt.stmt.Options.TIMESTAMP,
  843. }
  844. dm := make(map[string]interface{})
  845. if e := json.Unmarshal(tt.data, &dm); e != nil {
  846. log.Fatal(e)
  847. return
  848. } else {
  849. tuple := &xsql.Tuple{Message: dm}
  850. fv, afv := xsql.NewFunctionValuersForOp(nil)
  851. result := pp.Apply(ctx, tuple, fv, afv)
  852. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  853. if rt, ok := result.(*xsql.Tuple); ok {
  854. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  855. rt.Message["abc"] = rtt.UTC()
  856. }
  857. }
  858. if !reflect.DeepEqual(tt.result, result) {
  859. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  860. }
  861. }
  862. }
  863. }
  864. func TestPreprocessorError(t *testing.T) {
  865. tests := []struct {
  866. stmt *ast.StreamStmt
  867. data []byte
  868. result interface{}
  869. }{
  870. {
  871. stmt: &ast.StreamStmt{
  872. Name: ast.StreamName("demo"),
  873. StreamFields: []ast.StreamField{
  874. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  875. },
  876. },
  877. data: []byte(`{"abc": "dafsad"}`),
  878. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert string(dafsad) to int64"),
  879. }, {
  880. stmt: &ast.StreamStmt{
  881. Name: ast.StreamName("demo"),
  882. StreamFields: []ast.StreamField{
  883. {Name: "a", FieldType: &ast.RecType{
  884. StreamFields: []ast.StreamField{
  885. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  886. },
  887. }},
  888. },
  889. },
  890. data: []byte(`{"a": {"d" : "hello"}}`),
  891. result: errors.New("error in preprocessor: field a type mismatch: field b is not found"),
  892. }, {
  893. stmt: &ast.StreamStmt{
  894. Name: ast.StreamName("demo"),
  895. StreamFields: []ast.StreamField{
  896. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  897. },
  898. Options: &ast.Options{
  899. DATASOURCE: "users",
  900. FORMAT: "JSON",
  901. KEY: "USERID",
  902. CONF_KEY: "srv1",
  903. TYPE: "MQTT",
  904. TIMESTAMP: "abc",
  905. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  906. },
  907. },
  908. data: []byte(`{"abc": "not a time"}`),
  909. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert string(not a time) to int64"),
  910. },
  911. }
  912. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  913. defer conf.CloseLogger()
  914. contextLogger := conf.Log.WithField("rule", "TestPreprocessorError")
  915. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  916. for i, tt := range tests {
  917. pp := &Preprocessor{checkSchema: true}
  918. pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
  919. dm := make(map[string]interface{})
  920. if e := json.Unmarshal(tt.data, &dm); e != nil {
  921. log.Fatal(e)
  922. return
  923. } else {
  924. tuple := &xsql.Tuple{Message: dm}
  925. fv, afv := xsql.NewFunctionValuersForOp(nil)
  926. result := pp.Apply(ctx, tuple, fv, afv)
  927. if !reflect.DeepEqual(tt.result, result) {
  928. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  929. }
  930. }
  931. }
  932. }
  933. func TestPreprocessorForBinary(t *testing.T) {
  934. docsFolder, err := conf.GetLoc("docs/")
  935. if err != nil {
  936. t.Errorf("Cannot find docs folder: %v", err)
  937. }
  938. image, err := os.ReadFile(path.Join(docsFolder, "cover.jpg"))
  939. if err != nil {
  940. t.Errorf("Cannot read image: %v", err)
  941. }
  942. b64img := base64.StdEncoding.EncodeToString(image)
  943. var tests = []struct {
  944. stmt *ast.StreamStmt
  945. data []byte
  946. result interface{}
  947. }{
  948. { // 0
  949. stmt: &ast.StreamStmt{
  950. Name: ast.StreamName("demo"),
  951. StreamFields: []ast.StreamField{
  952. {Name: "a", FieldType: &ast.RecType{
  953. StreamFields: []ast.StreamField{
  954. {Name: "b", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  955. },
  956. }},
  957. },
  958. },
  959. data: []byte(fmt.Sprintf(`{"a": {"b" : "%s"}}`, b64img)),
  960. result: &xsql.Tuple{Message: xsql.Message{
  961. "a": map[string]interface{}{
  962. "b": image,
  963. },
  964. },
  965. },
  966. },
  967. { // 1
  968. stmt: &ast.StreamStmt{
  969. Name: ast.StreamName("demo"),
  970. StreamFields: []ast.StreamField{
  971. {Name: "a", FieldType: &ast.ArrayType{
  972. Type: ast.BYTEA,
  973. }},
  974. },
  975. },
  976. data: []byte(fmt.Sprintf(`{"a": ["%s"]}`, b64img)),
  977. result: &xsql.Tuple{Message: xsql.Message{
  978. "a": []interface{}{
  979. image,
  980. },
  981. },
  982. },
  983. },
  984. {
  985. stmt: &ast.StreamStmt{
  986. Name: ast.StreamName("demo"),
  987. StreamFields: []ast.StreamField{
  988. {Name: "a", FieldType: &ast.ArrayType{
  989. Type: ast.STRUCT,
  990. FieldType: &ast.RecType{
  991. StreamFields: []ast.StreamField{
  992. {Name: "b", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  993. },
  994. },
  995. }},
  996. },
  997. },
  998. data: []byte(fmt.Sprintf(`{"a": [{"b":"%s"}]}`, b64img)),
  999. result: &xsql.Tuple{Message: xsql.Message{
  1000. "a": []interface{}{
  1001. map[string]interface{}{"b": image},
  1002. },
  1003. },
  1004. },
  1005. },
  1006. }
  1007. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1008. defer conf.CloseLogger()
  1009. contextLogger := conf.Log.WithField("rule", "TestPreprocessorForBinary")
  1010. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1011. for i, tt := range tests {
  1012. pp := &Preprocessor{checkSchema: true}
  1013. pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
  1014. format := message.FormatJson
  1015. ccc, _ := converter.GetOrCreateConverter(&ast.Options{FORMAT: format})
  1016. nCtx := context.WithValue(ctx, context.DecodeKey, ccc)
  1017. if dm, e := nCtx.Decode(tt.data); e != nil {
  1018. log.Fatal(e)
  1019. return
  1020. } else {
  1021. tuple := &xsql.Tuple{Message: dm}
  1022. fv, afv := xsql.NewFunctionValuersForOp(nil)
  1023. result := pp.Apply(ctx, tuple, fv, afv)
  1024. if !reflect.DeepEqual(tt.result, result) {
  1025. t.Errorf("%d. %q\n\nresult mismatch", i, tuple)
  1026. }
  1027. }
  1028. }
  1029. }