preprocessor_test.go 29 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "encoding/base64"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "log"
  21. "os"
  22. "path"
  23. "reflect"
  24. "testing"
  25. "time"
  26. "github.com/stretchr/testify/assert"
  27. "github.com/stretchr/testify/require"
  28. "github.com/lf-edge/ekuiper/internal/conf"
  29. "github.com/lf-edge/ekuiper/internal/converter"
  30. "github.com/lf-edge/ekuiper/internal/topo/context"
  31. "github.com/lf-edge/ekuiper/internal/xsql"
  32. "github.com/lf-edge/ekuiper/pkg/ast"
  33. "github.com/lf-edge/ekuiper/pkg/cast"
  34. "github.com/lf-edge/ekuiper/pkg/message"
  35. )
  36. func TestPreprocessor_Apply(t *testing.T) {
  37. tests := []struct {
  38. stmt *ast.StreamStmt
  39. data []byte
  40. result interface{}
  41. }{
  42. // Basic type
  43. { // 0
  44. stmt: &ast.StreamStmt{
  45. Name: ast.StreamName("demo"),
  46. StreamFields: []ast.StreamField{
  47. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  48. },
  49. },
  50. data: []byte(`{"a": 6}`),
  51. result: errors.New("error in preprocessor: field abc is not found"),
  52. },
  53. { // 1
  54. stmt: &ast.StreamStmt{
  55. Name: ast.StreamName("demo"),
  56. StreamFields: []ast.StreamField{
  57. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  58. },
  59. },
  60. data: []byte(`{"abc": null}`),
  61. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert <nil>(<nil>) to int64"),
  62. },
  63. { // 2
  64. stmt: &ast.StreamStmt{
  65. Name: ast.StreamName("demo"),
  66. StreamFields: nil,
  67. },
  68. data: []byte(`{"a": 6}`),
  69. result: &xsql.Tuple{
  70. Message: xsql.Message{
  71. "a": float64(6),
  72. },
  73. },
  74. },
  75. { // 3
  76. stmt: &ast.StreamStmt{
  77. Name: ast.StreamName("demo"),
  78. StreamFields: []ast.StreamField{
  79. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  80. },
  81. },
  82. data: []byte(`{"abc": 6}`),
  83. result: &xsql.Tuple{
  84. Message: xsql.Message{
  85. "abc": int64(6),
  86. },
  87. },
  88. },
  89. { // 4
  90. stmt: &ast.StreamStmt{
  91. Name: ast.StreamName("demo"),
  92. StreamFields: nil,
  93. },
  94. data: []byte(`{"abc": 6}`),
  95. result: &xsql.Tuple{
  96. Message: xsql.Message{
  97. "abc": float64(6),
  98. },
  99. },
  100. },
  101. { // 5
  102. stmt: &ast.StreamStmt{
  103. Name: ast.StreamName("demo"),
  104. StreamFields: []ast.StreamField{
  105. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  106. {Name: "dEf", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  107. },
  108. },
  109. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  110. result: &xsql.Tuple{
  111. Message: xsql.Message{
  112. "abc": float64(34),
  113. "dEf": "hello",
  114. "def": "hello",
  115. "ghi": float64(50),
  116. },
  117. },
  118. },
  119. { // 6
  120. stmt: &ast.StreamStmt{
  121. Name: ast.StreamName("demo"),
  122. StreamFields: nil,
  123. },
  124. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  125. result: &xsql.Tuple{
  126. Message: xsql.Message{
  127. "abc": float64(34),
  128. "def": "hello",
  129. "ghi": float64(50),
  130. },
  131. },
  132. },
  133. { // 7
  134. stmt: &ast.StreamStmt{
  135. Name: ast.StreamName("demo"),
  136. StreamFields: []ast.StreamField{
  137. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  138. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  139. },
  140. },
  141. data: []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
  142. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert string(34) to float64"),
  143. },
  144. { // 8
  145. stmt: &ast.StreamStmt{
  146. Name: ast.StreamName("demo"),
  147. StreamFields: []ast.StreamField{
  148. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  149. {Name: "def", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  150. },
  151. },
  152. data: []byte(`{"abc": 77, "def" : "hello"}`),
  153. result: errors.New("error in preprocessor: field def type mismatch: cannot convert string(hello) to bool"),
  154. },
  155. { // 9
  156. stmt: &ast.StreamStmt{
  157. Name: ast.StreamName("demo"),
  158. StreamFields: []ast.StreamField{
  159. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  160. },
  161. },
  162. data: []byte(`{"a": {"b" : "hello"}}`),
  163. result: errors.New("error in preprocessor: field abc is not found"),
  164. },
  165. { // 10
  166. stmt: &ast.StreamStmt{
  167. Name: ast.StreamName("demo"),
  168. StreamFields: nil,
  169. },
  170. data: []byte(`{"a": {"b" : "hello"}}`),
  171. result: &xsql.Tuple{
  172. Message: xsql.Message{
  173. "a": map[string]interface{}{
  174. "b": "hello",
  175. },
  176. },
  177. },
  178. },
  179. // Rec type
  180. { // 11
  181. stmt: &ast.StreamStmt{
  182. Name: ast.StreamName("demo"),
  183. StreamFields: []ast.StreamField{
  184. {Name: "a", FieldType: &ast.RecType{
  185. StreamFields: []ast.StreamField{
  186. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  187. },
  188. }},
  189. },
  190. },
  191. data: []byte(`{"a": {"b" : "hello"}}`),
  192. result: &xsql.Tuple{
  193. Message: xsql.Message{
  194. "a": map[string]interface{}{
  195. "b": "hello",
  196. },
  197. },
  198. },
  199. },
  200. { // 12
  201. stmt: &ast.StreamStmt{
  202. Name: ast.StreamName("demo"),
  203. StreamFields: []ast.StreamField{
  204. {Name: "a", FieldType: &ast.RecType{
  205. StreamFields: []ast.StreamField{
  206. {Name: "b", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  207. },
  208. }},
  209. },
  210. },
  211. data: []byte(`{"a": "{\"b\" : \"32\"}"}`),
  212. result: errors.New("error in preprocessor: field a type mismatch: field b type mismatch: cannot convert string(32) to float64"),
  213. },
  214. { // 13
  215. stmt: &ast.StreamStmt{
  216. Name: ast.StreamName("demo"),
  217. StreamFields: nil,
  218. },
  219. data: []byte(`{"a": {"b" : "32"}}`),
  220. result: &xsql.Tuple{
  221. Message: xsql.Message{
  222. "a": map[string]interface{}{
  223. "b": "32",
  224. },
  225. },
  226. },
  227. },
  228. // Array of complex type
  229. { // 14
  230. stmt: &ast.StreamStmt{
  231. Name: ast.StreamName("demo"),
  232. StreamFields: []ast.StreamField{
  233. {Name: "a", FieldType: &ast.ArrayType{
  234. Type: ast.STRUCT,
  235. FieldType: &ast.RecType{
  236. StreamFields: []ast.StreamField{
  237. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  238. },
  239. },
  240. }},
  241. },
  242. },
  243. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  244. result: &xsql.Tuple{
  245. Message: xsql.Message{
  246. "a": []interface{}{
  247. map[string]interface{}{"b": "hello1"},
  248. map[string]interface{}{"b": "hello2"},
  249. },
  250. },
  251. },
  252. },
  253. { // 15
  254. stmt: &ast.StreamStmt{
  255. Name: ast.StreamName("demo"),
  256. StreamFields: []ast.StreamField{
  257. {Name: "a", FieldType: &ast.ArrayType{
  258. Type: ast.STRUCT,
  259. FieldType: &ast.RecType{
  260. StreamFields: []ast.StreamField{
  261. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  262. },
  263. },
  264. }},
  265. },
  266. },
  267. data: []byte(`{"a": []}`),
  268. result: &xsql.Tuple{
  269. Message: xsql.Message{
  270. "a": make([]interface{}, 0),
  271. },
  272. },
  273. },
  274. { // 16
  275. stmt: &ast.StreamStmt{
  276. Name: ast.StreamName("demo"),
  277. StreamFields: []ast.StreamField{
  278. {Name: "a", FieldType: &ast.ArrayType{
  279. Type: ast.STRUCT,
  280. FieldType: &ast.RecType{
  281. StreamFields: []ast.StreamField{
  282. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  283. },
  284. },
  285. }},
  286. },
  287. },
  288. data: []byte(`{"a": null}`),
  289. result: &xsql.Tuple{
  290. Message: xsql.Message{
  291. "a": []interface{}(nil),
  292. },
  293. },
  294. },
  295. { // 17
  296. stmt: &ast.StreamStmt{
  297. Name: ast.StreamName("demo"),
  298. StreamFields: []ast.StreamField{
  299. {Name: "a", FieldType: &ast.ArrayType{
  300. Type: ast.STRUCT,
  301. FieldType: &ast.RecType{
  302. StreamFields: []ast.StreamField{
  303. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  304. },
  305. },
  306. }},
  307. },
  308. },
  309. data: []byte(`{"a": [null, {"b" : "hello2"}]}`),
  310. result: &xsql.Tuple{
  311. Message: xsql.Message{
  312. "a": []interface{}{
  313. map[string]interface{}(nil),
  314. map[string]interface{}{"b": "hello2"},
  315. },
  316. },
  317. },
  318. },
  319. { // 18
  320. stmt: &ast.StreamStmt{
  321. Name: ast.StreamName("demo"),
  322. StreamFields: []ast.StreamField{
  323. {Name: "a", FieldType: &ast.ArrayType{
  324. Type: ast.ARRAY,
  325. FieldType: &ast.ArrayType{
  326. Type: ast.BIGINT,
  327. },
  328. }},
  329. },
  330. },
  331. data: []byte(`{"a": [[50, 60, 70],[66], [77]]}`),
  332. result: &xsql.Tuple{
  333. Message: xsql.Message{
  334. "a": []interface{}{
  335. []interface{}{int64(50), int64(60), int64(70)},
  336. []interface{}{int64(66)},
  337. []interface{}{int64(77)},
  338. },
  339. },
  340. },
  341. },
  342. { // 19
  343. stmt: &ast.StreamStmt{
  344. Name: ast.StreamName("demo"),
  345. StreamFields: []ast.StreamField{
  346. {Name: "a", FieldType: &ast.ArrayType{
  347. Type: ast.ARRAY,
  348. FieldType: &ast.ArrayType{
  349. Type: ast.BIGINT,
  350. },
  351. }},
  352. },
  353. },
  354. data: []byte(`{"a": [null, [66], [77]]}`),
  355. result: &xsql.Tuple{
  356. Message: xsql.Message{
  357. "a": []interface{}{
  358. []interface{}(nil),
  359. []interface{}{int64(66)},
  360. []interface{}{int64(77)},
  361. },
  362. },
  363. },
  364. },
  365. { // 20
  366. stmt: &ast.StreamStmt{
  367. Name: ast.StreamName("demo"),
  368. StreamFields: nil,
  369. },
  370. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  371. result: &xsql.Tuple{
  372. Message: xsql.Message{
  373. "a": []interface{}{
  374. map[string]interface{}{"b": "hello1"},
  375. map[string]interface{}{"b": "hello2"},
  376. },
  377. },
  378. },
  379. },
  380. { // 21
  381. stmt: &ast.StreamStmt{
  382. Name: ast.StreamName("demo"),
  383. StreamFields: []ast.StreamField{
  384. {Name: "a", FieldType: &ast.ArrayType{
  385. Type: ast.FLOAT,
  386. }},
  387. },
  388. },
  389. data: []byte(`{"a": "[\"55\", \"77\"]"}`),
  390. result: errors.New("error in preprocessor: field a type mismatch: expect array but got [\"55\", \"77\"]"),
  391. },
  392. { // 22
  393. stmt: &ast.StreamStmt{
  394. Name: ast.StreamName("demo"),
  395. StreamFields: nil,
  396. },
  397. data: []byte(`{"a": [55, 77]}`),
  398. result: &xsql.Tuple{
  399. Message: xsql.Message{
  400. "a": []interface{}{
  401. float64(55),
  402. float64(77),
  403. },
  404. },
  405. },
  406. },
  407. // Rec of complex type
  408. { // 23
  409. stmt: &ast.StreamStmt{
  410. Name: ast.StreamName("demo"),
  411. StreamFields: []ast.StreamField{
  412. {Name: "a", FieldType: &ast.RecType{
  413. StreamFields: []ast.StreamField{
  414. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  415. {Name: "c", FieldType: &ast.RecType{
  416. StreamFields: []ast.StreamField{
  417. {Name: "d", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  418. },
  419. }},
  420. },
  421. }},
  422. },
  423. },
  424. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  425. result: &xsql.Tuple{
  426. Message: xsql.Message{
  427. "a": map[string]interface{}{
  428. "b": "hello",
  429. "c": map[string]interface{}{
  430. "d": int64(35),
  431. },
  432. },
  433. },
  434. },
  435. },
  436. { // 24
  437. stmt: &ast.StreamStmt{
  438. Name: ast.StreamName("demo"),
  439. StreamFields: []ast.StreamField{
  440. {Name: "a", FieldType: &ast.RecType{
  441. StreamFields: []ast.StreamField{
  442. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  443. {Name: "c", FieldType: &ast.RecType{
  444. StreamFields: []ast.StreamField{
  445. {Name: "d", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  446. },
  447. }},
  448. },
  449. }},
  450. },
  451. },
  452. data: []byte(`{"a": null}`),
  453. result: &xsql.Tuple{
  454. Message: xsql.Message{
  455. "a": map[string]interface{}(nil),
  456. },
  457. },
  458. },
  459. { // 25
  460. stmt: &ast.StreamStmt{
  461. Name: ast.StreamName("demo"),
  462. StreamFields: []ast.StreamField{
  463. {Name: "a", FieldType: &ast.RecType{
  464. StreamFields: []ast.StreamField{
  465. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  466. {Name: "c", FieldType: &ast.ArrayType{
  467. Type: ast.FLOAT,
  468. }},
  469. },
  470. }},
  471. },
  472. },
  473. data: []byte(`{"a": {"b" : "hello", "c": [35.2, 38.2]}}`),
  474. result: &xsql.Tuple{
  475. Message: xsql.Message{
  476. "a": map[string]interface{}{
  477. "b": "hello",
  478. "c": []interface{}{
  479. 35.2, 38.2,
  480. },
  481. },
  482. },
  483. },
  484. },
  485. { // 26
  486. stmt: &ast.StreamStmt{
  487. Name: ast.StreamName("demo"),
  488. StreamFields: []ast.StreamField{
  489. {Name: "a", FieldType: &ast.RecType{
  490. StreamFields: []ast.StreamField{
  491. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  492. {Name: "c", FieldType: &ast.ArrayType{
  493. Type: ast.FLOAT,
  494. }},
  495. },
  496. }},
  497. },
  498. },
  499. data: []byte(`{"a": {"b" : "hello", "c": null}}`),
  500. result: &xsql.Tuple{
  501. Message: xsql.Message{
  502. "a": map[string]interface{}{
  503. "b": "hello",
  504. "c": []interface{}(nil),
  505. },
  506. },
  507. },
  508. },
  509. { // 27
  510. stmt: &ast.StreamStmt{
  511. Name: ast.StreamName("demo"),
  512. StreamFields: []ast.StreamField{
  513. {Name: "a", FieldType: &ast.RecType{
  514. StreamFields: []ast.StreamField{
  515. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  516. {Name: "c", FieldType: &ast.ArrayType{
  517. Type: ast.FLOAT,
  518. }},
  519. },
  520. }},
  521. },
  522. },
  523. data: []byte(`{"a": {"b" : "hello", "c": [null, 35.4]}}`),
  524. result: errors.New("error in preprocessor: field a type mismatch: field c type mismatch: array element type mismatch: cannot convert <nil>(<nil>) to float64"),
  525. },
  526. { // 28
  527. stmt: &ast.StreamStmt{
  528. Name: ast.StreamName("demo"),
  529. StreamFields: nil,
  530. },
  531. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  532. result: &xsql.Tuple{
  533. Message: xsql.Message{
  534. "a": map[string]interface{}{
  535. "b": "hello",
  536. "c": map[string]interface{}{
  537. "d": 35.2,
  538. },
  539. },
  540. },
  541. },
  542. },
  543. }
  544. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  545. defer conf.CloseLogger()
  546. contextLogger := conf.Log.WithField("rule", "TestPreprocessor_Apply")
  547. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  548. for i, tt := range tests {
  549. pp := &Preprocessor{checkSchema: true}
  550. pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
  551. dm := make(map[string]interface{})
  552. if e := json.Unmarshal(tt.data, &dm); e != nil {
  553. log.Fatal(e)
  554. return
  555. } else {
  556. tuple := &xsql.Tuple{Message: dm}
  557. fv, afv := xsql.NewFunctionValuersForOp(nil)
  558. result := pp.Apply(ctx, tuple, fv, afv)
  559. if !reflect.DeepEqual(tt.result, result) {
  560. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  561. }
  562. }
  563. }
  564. }
  565. func TestPreprocessorTime_Apply(t *testing.T) {
  566. err := cast.SetTimeZone("Local")
  567. require.NoError(t, err)
  568. tests := []struct {
  569. stmt *ast.StreamStmt
  570. data []byte
  571. result interface{}
  572. }{
  573. { // 0
  574. stmt: &ast.StreamStmt{
  575. Name: ast.StreamName("demo"),
  576. StreamFields: []ast.StreamField{
  577. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  578. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  579. },
  580. },
  581. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  582. result: &xsql.Tuple{
  583. Message: xsql.Message{
  584. "abc": cast.TimeFromUnixMilli(1568854515000),
  585. "def": cast.TimeFromUnixMilli(1568854573431),
  586. },
  587. },
  588. },
  589. { // 1
  590. stmt: &ast.StreamStmt{
  591. Name: ast.StreamName("demo"),
  592. StreamFields: nil,
  593. },
  594. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  595. result: &xsql.Tuple{
  596. Message: xsql.Message{
  597. "abc": "2019-09-19T00:55:15.000Z",
  598. "def": float64(1568854573431),
  599. },
  600. },
  601. },
  602. { // 2
  603. stmt: &ast.StreamStmt{
  604. Name: ast.StreamName("demo"),
  605. StreamFields: []ast.StreamField{
  606. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  607. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  608. },
  609. },
  610. data: []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
  611. result: errors.New("error in preprocessor: field abc type mismatch: Can't parse string as time: 2019-09-19T00:55:1dd5Z"),
  612. },
  613. { // 3
  614. stmt: &ast.StreamStmt{
  615. Name: ast.StreamName("demo"),
  616. StreamFields: []ast.StreamField{
  617. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  618. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  619. },
  620. Options: &ast.Options{
  621. DATASOURCE: "users",
  622. FORMAT: "JSON",
  623. KEY: "USERID",
  624. CONF_KEY: "srv1",
  625. TYPE: "MQTT",
  626. TIMESTAMP: "USERID",
  627. TIMESTAMP_FORMAT: "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
  628. },
  629. },
  630. data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
  631. result: &xsql.Tuple{Message: xsql.Message{
  632. "abc": cast.TimeFromUnixMilli(1568894115000),
  633. "def": cast.TimeFromUnixMilli(1568854573431),
  634. }},
  635. },
  636. // Array type
  637. { // 4
  638. stmt: &ast.StreamStmt{
  639. Name: ast.StreamName("demo"),
  640. StreamFields: []ast.StreamField{
  641. {Name: "a", FieldType: &ast.ArrayType{
  642. Type: ast.DATETIME,
  643. }},
  644. },
  645. },
  646. data: []byte(`{"a": [1568854515123, 1568854573431]}`),
  647. result: &xsql.Tuple{
  648. Message: xsql.Message{
  649. "a": []interface{}{
  650. cast.TimeFromUnixMilli(1568854515123),
  651. cast.TimeFromUnixMilli(1568854573431),
  652. },
  653. },
  654. },
  655. },
  656. { // 5
  657. stmt: &ast.StreamStmt{
  658. Name: ast.StreamName("demo"),
  659. StreamFields: []ast.StreamField{
  660. {Name: "a", FieldType: &ast.RecType{
  661. StreamFields: []ast.StreamField{
  662. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  663. {Name: "c", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  664. },
  665. }},
  666. },
  667. },
  668. data: []byte(`{"a": {"b" : "hello", "c": 1568854515000}}`),
  669. result: &xsql.Tuple{
  670. Message: xsql.Message{
  671. "a": map[string]interface{}{
  672. "b": "hello",
  673. "c": cast.TimeFromUnixMilli(1568854515000),
  674. },
  675. },
  676. },
  677. },
  678. }
  679. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  680. defer conf.CloseLogger()
  681. contextLogger := conf.Log.WithField("rule", "TestPreprocessorTime_Apply")
  682. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  683. for i, tt := range tests {
  684. timestampFormat := ""
  685. if tt.stmt.Options != nil {
  686. timestampFormat = tt.stmt.Options.TIMESTAMP_FORMAT
  687. }
  688. pp, e := NewPreprocessor(false, tt.stmt.StreamFields.ToJsonSchema(), false, nil, false, "", timestampFormat, false, true)
  689. assert.NoError(t, e)
  690. dm := make(map[string]interface{})
  691. if e := json.Unmarshal(tt.data, &dm); e != nil {
  692. log.Fatal(e)
  693. return
  694. } else {
  695. tuple := &xsql.Tuple{Message: dm}
  696. fv, afv := xsql.NewFunctionValuersForOp(nil)
  697. result := pp.Apply(ctx, tuple, fv, afv)
  698. // workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  699. if rt, ok := result.(*xsql.Tuple); ok {
  700. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  701. rt.Message["abc"] = rtt.Local()
  702. }
  703. }
  704. if !reflect.DeepEqual(tt.result, result) {
  705. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  706. }
  707. }
  708. }
  709. }
  710. func convertFields(o ast.StreamFields) []interface{} {
  711. if o == nil {
  712. return nil
  713. }
  714. fields := make([]interface{}, len(o))
  715. for i := range o {
  716. fields[i] = &o[i]
  717. }
  718. return fields
  719. }
  720. func TestPreprocessorEventtime_Apply(t *testing.T) {
  721. err := cast.SetTimeZone("UTC")
  722. require.NoError(t, err)
  723. tests := []struct {
  724. stmt *ast.StreamStmt
  725. data []byte
  726. result interface{}
  727. }{
  728. // Basic type
  729. { // 0
  730. stmt: &ast.StreamStmt{
  731. Name: ast.StreamName("demo"),
  732. StreamFields: []ast.StreamField{
  733. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  734. },
  735. Options: &ast.Options{
  736. DATASOURCE: "users",
  737. FORMAT: "JSON",
  738. KEY: "USERID",
  739. CONF_KEY: "srv1",
  740. TYPE: "MQTT",
  741. TIMESTAMP: "abc",
  742. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  743. },
  744. },
  745. data: []byte(`{"abc": 1568854515000}`),
  746. result: &xsql.Tuple{
  747. Message: xsql.Message{
  748. "abc": int64(1568854515000),
  749. }, Timestamp: 1568854515000,
  750. },
  751. },
  752. { // 1
  753. stmt: &ast.StreamStmt{
  754. Name: ast.StreamName("demo"),
  755. StreamFields: nil,
  756. Options: &ast.Options{
  757. DATASOURCE: "users",
  758. FORMAT: "JSON",
  759. KEY: "USERID",
  760. CONF_KEY: "srv1",
  761. TYPE: "MQTT",
  762. TIMESTAMP: "abc",
  763. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  764. },
  765. },
  766. data: []byte(`{"abc": 1568854515000}`),
  767. result: &xsql.Tuple{
  768. Message: xsql.Message{
  769. "abc": float64(1568854515000),
  770. }, Timestamp: 1568854515000,
  771. },
  772. },
  773. { // 2
  774. stmt: &ast.StreamStmt{
  775. Name: ast.StreamName("demo"),
  776. StreamFields: []ast.StreamField{
  777. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  778. },
  779. Options: &ast.Options{
  780. DATASOURCE: "users",
  781. TIMESTAMP: "abc",
  782. },
  783. },
  784. data: []byte(`{"abc": true}`),
  785. result: errors.New("cannot convert timestamp field abc to timestamp with error unsupported type to convert to timestamp true"),
  786. },
  787. { // 3
  788. stmt: &ast.StreamStmt{
  789. Name: ast.StreamName("demo"),
  790. StreamFields: []ast.StreamField{
  791. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  792. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  793. },
  794. Options: &ast.Options{
  795. DATASOURCE: "users",
  796. TIMESTAMP: "def",
  797. },
  798. },
  799. data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
  800. result: &xsql.Tuple{
  801. Message: xsql.Message{
  802. "abc": float64(34),
  803. "def": "2019-09-23T02:47:29.754Z",
  804. "ghi": float64(50),
  805. }, Timestamp: int64(1569206849754),
  806. },
  807. },
  808. { // 4
  809. stmt: &ast.StreamStmt{
  810. Name: ast.StreamName("demo"),
  811. StreamFields: []ast.StreamField{
  812. {Name: "abc", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  813. {Name: "def", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  814. },
  815. Options: &ast.Options{
  816. DATASOURCE: "users",
  817. TIMESTAMP: "abc",
  818. },
  819. },
  820. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  821. result: &xsql.Tuple{
  822. Message: xsql.Message{
  823. "abc": cast.TimeFromUnixMilli(1568854515000),
  824. "def": cast.TimeFromUnixMilli(1568854573431),
  825. }, Timestamp: int64(1568854515000),
  826. },
  827. },
  828. { // 5
  829. stmt: &ast.StreamStmt{
  830. Name: ast.StreamName("demo"),
  831. StreamFields: []ast.StreamField{
  832. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  833. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  834. },
  835. Options: &ast.Options{
  836. DATASOURCE: "users",
  837. TIMESTAMP: "def",
  838. TIMESTAMP_FORMAT: "yyyy-MM-dd'AT'HH:mm:ss",
  839. },
  840. },
  841. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  842. result: &xsql.Tuple{
  843. Message: xsql.Message{
  844. "abc": float64(34),
  845. "def": "2019-09-23AT02:47:29",
  846. "ghi": float64(50),
  847. }, Timestamp: int64(1569206849000),
  848. },
  849. },
  850. { // 6
  851. stmt: &ast.StreamStmt{
  852. Name: ast.StreamName("demo"),
  853. StreamFields: []ast.StreamField{
  854. {Name: "abc", FieldType: &ast.BasicType{Type: ast.FLOAT}},
  855. {Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  856. },
  857. Options: &ast.Options{
  858. DATASOURCE: "users",
  859. TIMESTAMP: "def",
  860. TIMESTAMP_FORMAT: "yyyy-MM-ddaHH:mm:ss",
  861. },
  862. },
  863. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  864. result: errors.New("cannot convert timestamp field def to timestamp with error Can't parse string as time: 2019-09-23AT02:47:29"),
  865. },
  866. }
  867. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  868. defer conf.CloseLogger()
  869. contextLogger := conf.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
  870. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  871. for i, tt := range tests {
  872. pp := &Preprocessor{
  873. checkSchema: true,
  874. defaultFieldProcessor: defaultFieldProcessor{
  875. streamFields: tt.stmt.StreamFields.ToJsonSchema(),
  876. timestampFormat: tt.stmt.Options.TIMESTAMP_FORMAT,
  877. },
  878. isEventTime: true,
  879. timestampField: tt.stmt.Options.TIMESTAMP,
  880. }
  881. dm := make(map[string]interface{})
  882. if e := json.Unmarshal(tt.data, &dm); e != nil {
  883. log.Fatal(e)
  884. return
  885. } else {
  886. tuple := &xsql.Tuple{Message: dm}
  887. fv, afv := xsql.NewFunctionValuersForOp(nil)
  888. result := pp.Apply(ctx, tuple, fv, afv)
  889. // workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  890. if rt, ok := result.(*xsql.Tuple); ok {
  891. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  892. rt.Message["abc"] = rtt.UTC()
  893. }
  894. }
  895. if !reflect.DeepEqual(tt.result, result) {
  896. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  897. }
  898. }
  899. }
  900. }
  901. func TestPreprocessorError(t *testing.T) {
  902. tests := []struct {
  903. stmt *ast.StreamStmt
  904. data []byte
  905. result interface{}
  906. }{
  907. {
  908. stmt: &ast.StreamStmt{
  909. Name: ast.StreamName("demo"),
  910. StreamFields: []ast.StreamField{
  911. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  912. },
  913. },
  914. data: []byte(`{"abc": "dafsad"}`),
  915. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert string(dafsad) to int64"),
  916. }, {
  917. stmt: &ast.StreamStmt{
  918. Name: ast.StreamName("demo"),
  919. StreamFields: []ast.StreamField{
  920. {Name: "a", FieldType: &ast.RecType{
  921. StreamFields: []ast.StreamField{
  922. {Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  923. },
  924. }},
  925. },
  926. },
  927. data: []byte(`{"a": {"d" : "hello"}}`),
  928. result: errors.New("error in preprocessor: field a type mismatch: field b is not found"),
  929. }, {
  930. stmt: &ast.StreamStmt{
  931. Name: ast.StreamName("demo"),
  932. StreamFields: []ast.StreamField{
  933. {Name: "abc", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  934. },
  935. Options: &ast.Options{
  936. DATASOURCE: "users",
  937. FORMAT: "JSON",
  938. KEY: "USERID",
  939. CONF_KEY: "srv1",
  940. TYPE: "MQTT",
  941. TIMESTAMP: "abc",
  942. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  943. },
  944. },
  945. data: []byte(`{"abc": "not a time"}`),
  946. result: errors.New("error in preprocessor: field abc type mismatch: cannot convert string(not a time) to int64"),
  947. },
  948. }
  949. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  950. defer conf.CloseLogger()
  951. contextLogger := conf.Log.WithField("rule", "TestPreprocessorError")
  952. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  953. for i, tt := range tests {
  954. pp := &Preprocessor{checkSchema: true}
  955. pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
  956. dm := make(map[string]interface{})
  957. if e := json.Unmarshal(tt.data, &dm); e != nil {
  958. log.Fatal(e)
  959. return
  960. } else {
  961. tuple := &xsql.Tuple{Message: dm}
  962. fv, afv := xsql.NewFunctionValuersForOp(nil)
  963. result := pp.Apply(ctx, tuple, fv, afv)
  964. if !reflect.DeepEqual(tt.result, result) {
  965. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  966. }
  967. }
  968. }
  969. }
  970. func TestPreprocessorForBinary(t *testing.T) {
  971. docsFolder, err := conf.GetLoc("docs/")
  972. if err != nil {
  973. t.Errorf("Cannot find docs folder: %v", err)
  974. }
  975. image, err := os.ReadFile(path.Join(docsFolder, "cover.jpg"))
  976. if err != nil {
  977. t.Errorf("Cannot read image: %v", err)
  978. }
  979. b64img := base64.StdEncoding.EncodeToString(image)
  980. tests := []struct {
  981. stmt *ast.StreamStmt
  982. data []byte
  983. result interface{}
  984. }{
  985. { // 0
  986. stmt: &ast.StreamStmt{
  987. Name: ast.StreamName("demo"),
  988. StreamFields: []ast.StreamField{
  989. {Name: "a", FieldType: &ast.RecType{
  990. StreamFields: []ast.StreamField{
  991. {Name: "b", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  992. },
  993. }},
  994. },
  995. },
  996. data: []byte(fmt.Sprintf(`{"a": {"b" : "%s"}}`, b64img)),
  997. result: &xsql.Tuple{
  998. Message: xsql.Message{
  999. "a": map[string]interface{}{
  1000. "b": image,
  1001. },
  1002. },
  1003. },
  1004. },
  1005. { // 1
  1006. stmt: &ast.StreamStmt{
  1007. Name: ast.StreamName("demo"),
  1008. StreamFields: []ast.StreamField{
  1009. {Name: "a", FieldType: &ast.ArrayType{
  1010. Type: ast.BYTEA,
  1011. }},
  1012. },
  1013. },
  1014. data: []byte(fmt.Sprintf(`{"a": ["%s"]}`, b64img)),
  1015. result: &xsql.Tuple{
  1016. Message: xsql.Message{
  1017. "a": []interface{}{
  1018. image,
  1019. },
  1020. },
  1021. },
  1022. },
  1023. {
  1024. stmt: &ast.StreamStmt{
  1025. Name: ast.StreamName("demo"),
  1026. StreamFields: []ast.StreamField{
  1027. {Name: "a", FieldType: &ast.ArrayType{
  1028. Type: ast.STRUCT,
  1029. FieldType: &ast.RecType{
  1030. StreamFields: []ast.StreamField{
  1031. {Name: "b", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  1032. },
  1033. },
  1034. }},
  1035. },
  1036. },
  1037. data: []byte(fmt.Sprintf(`{"a": [{"b":"%s"}]}`, b64img)),
  1038. result: &xsql.Tuple{
  1039. Message: xsql.Message{
  1040. "a": []interface{}{
  1041. map[string]interface{}{"b": image},
  1042. },
  1043. },
  1044. },
  1045. },
  1046. }
  1047. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1048. defer conf.CloseLogger()
  1049. contextLogger := conf.Log.WithField("rule", "TestPreprocessorForBinary")
  1050. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1051. for i, tt := range tests {
  1052. pp := &Preprocessor{checkSchema: true}
  1053. pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
  1054. format := message.FormatJson
  1055. ccc, _ := converter.GetOrCreateConverter(&ast.Options{FORMAT: format})
  1056. nCtx := context.WithValue(ctx, context.DecodeKey, ccc)
  1057. if dm, e := nCtx.Decode(tt.data); e != nil {
  1058. log.Fatal(e)
  1059. return
  1060. } else {
  1061. tuple := &xsql.Tuple{Message: dm}
  1062. fv, afv := xsql.NewFunctionValuersForOp(nil)
  1063. result := pp.Apply(ctx, tuple, fv, afv)
  1064. if !reflect.DeepEqual(tt.result, result) {
  1065. t.Errorf("%d. %q\n\nresult mismatch", i, tuple)
  1066. }
  1067. }
  1068. }
  1069. }