preprocessor_test.go 28 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073
  1. package operators
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/emqx/kuiper/common"
  8. "github.com/emqx/kuiper/xsql"
  9. "github.com/emqx/kuiper/xstream/contexts"
  10. "io/ioutil"
  11. "log"
  12. "path"
  13. "reflect"
  14. "testing"
  15. "time"
  16. )
  17. func TestPreprocessor_Apply(t *testing.T) {
  18. var tests = []struct {
  19. stmt *xsql.StreamStmt
  20. data []byte
  21. result interface{}
  22. }{
  23. //Basic type
  24. {
  25. stmt: &xsql.StreamStmt{
  26. Name: xsql.StreamName("demo"),
  27. StreamFields: []xsql.StreamField{
  28. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  29. },
  30. },
  31. data: []byte(`{"a": 6}`),
  32. result: errors.New("error in preprocessor: invalid data map[a:%!s(float64=6)], field abc not found"),
  33. },
  34. {
  35. stmt: &xsql.StreamStmt{
  36. Name: xsql.StreamName("demo"),
  37. StreamFields: []xsql.StreamField{
  38. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  39. },
  40. },
  41. data: []byte(`{"abc": null}`),
  42. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found <nil>(<nil>)"),
  43. },
  44. {
  45. stmt: &xsql.StreamStmt{
  46. Name: xsql.StreamName("demo"),
  47. StreamFields: nil,
  48. },
  49. data: []byte(`{"a": 6}`),
  50. result: &xsql.Tuple{Message: xsql.Message{
  51. "a": float64(6),
  52. },
  53. },
  54. },
  55. {
  56. stmt: &xsql.StreamStmt{
  57. Name: xsql.StreamName("demo"),
  58. StreamFields: []xsql.StreamField{
  59. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  60. },
  61. },
  62. data: []byte(`{"abc": 6}`),
  63. result: &xsql.Tuple{Message: xsql.Message{
  64. "abc": 6,
  65. },
  66. },
  67. },
  68. {
  69. stmt: &xsql.StreamStmt{
  70. Name: xsql.StreamName("demo"),
  71. StreamFields: nil,
  72. },
  73. data: []byte(`{"abc": 6}`),
  74. result: &xsql.Tuple{Message: xsql.Message{
  75. "abc": float64(6),
  76. },
  77. },
  78. },
  79. {
  80. stmt: &xsql.StreamStmt{
  81. Name: xsql.StreamName("demo"),
  82. StreamFields: []xsql.StreamField{
  83. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  84. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  85. },
  86. },
  87. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  88. result: &xsql.Tuple{Message: xsql.Message{
  89. "abc": float64(34),
  90. "def": "hello",
  91. },
  92. },
  93. },
  94. {
  95. stmt: &xsql.StreamStmt{
  96. Name: xsql.StreamName("demo"),
  97. StreamFields: nil,
  98. },
  99. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  100. result: &xsql.Tuple{Message: xsql.Message{
  101. "abc": float64(34),
  102. "def": "hello",
  103. "ghi": float64(50),
  104. },
  105. },
  106. },
  107. {
  108. stmt: &xsql.StreamStmt{
  109. Name: xsql.StreamName("demo"),
  110. StreamFields: []xsql.StreamField{
  111. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  112. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  113. },
  114. },
  115. data: []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
  116. result: &xsql.Tuple{Message: xsql.Message{
  117. "abc": float64(34),
  118. "def": "hello",
  119. },
  120. },
  121. },
  122. {
  123. stmt: &xsql.StreamStmt{
  124. Name: xsql.StreamName("demo"),
  125. StreamFields: []xsql.StreamField{
  126. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  127. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  128. },
  129. },
  130. data: []byte(`{"abc": 77, "def" : "hello"}`),
  131. result: errors.New("error in preprocessor: invalid data type for def, expect boolean but found string(hello)"),
  132. },
  133. {
  134. stmt: &xsql.StreamStmt{
  135. Name: xsql.StreamName("demo"),
  136. StreamFields: []xsql.StreamField{
  137. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  138. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  139. },
  140. },
  141. data: []byte(`{"a": {"b" : "hello"}}`),
  142. result: errors.New("error in preprocessor: invalid data map[a:map[b:hello]], field abc not found"),
  143. },
  144. {
  145. stmt: &xsql.StreamStmt{
  146. Name: xsql.StreamName("demo"),
  147. StreamFields: nil,
  148. },
  149. data: []byte(`{"a": {"b" : "hello"}}`),
  150. result: &xsql.Tuple{Message: xsql.Message{
  151. "a": map[string]interface{}{
  152. "b": "hello",
  153. },
  154. },
  155. },
  156. },
  157. //Rec type
  158. {
  159. stmt: &xsql.StreamStmt{
  160. Name: xsql.StreamName("demo"),
  161. StreamFields: []xsql.StreamField{
  162. {Name: "a", FieldType: &xsql.RecType{
  163. StreamFields: []xsql.StreamField{
  164. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  165. },
  166. }},
  167. },
  168. },
  169. data: []byte(`{"a": {"b" : "hello"}}`),
  170. result: &xsql.Tuple{Message: xsql.Message{
  171. "a": map[string]interface{}{
  172. "b": "hello",
  173. },
  174. },
  175. },
  176. },
  177. {
  178. stmt: &xsql.StreamStmt{
  179. Name: xsql.StreamName("demo"),
  180. StreamFields: []xsql.StreamField{
  181. {Name: "a", FieldType: &xsql.RecType{
  182. StreamFields: []xsql.StreamField{
  183. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  184. },
  185. }},
  186. },
  187. },
  188. data: []byte(`{"a": "{\"b\" : \"32\"}"}`),
  189. result: &xsql.Tuple{Message: xsql.Message{
  190. "a": map[string]interface{}{
  191. "b": float64(32),
  192. },
  193. },
  194. },
  195. },
  196. {
  197. stmt: &xsql.StreamStmt{
  198. Name: xsql.StreamName("demo"),
  199. StreamFields: nil,
  200. },
  201. data: []byte(`{"a": {"b" : "32"}}`),
  202. result: &xsql.Tuple{Message: xsql.Message{
  203. "a": map[string]interface{}{
  204. "b": "32",
  205. },
  206. },
  207. },
  208. },
  209. //Array of complex type
  210. {
  211. stmt: &xsql.StreamStmt{
  212. Name: xsql.StreamName("demo"),
  213. StreamFields: []xsql.StreamField{
  214. {Name: "a", FieldType: &xsql.ArrayType{
  215. Type: xsql.STRUCT,
  216. FieldType: &xsql.RecType{
  217. StreamFields: []xsql.StreamField{
  218. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  219. },
  220. },
  221. }},
  222. },
  223. },
  224. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  225. result: &xsql.Tuple{Message: xsql.Message{
  226. "a": []map[string]interface{}{
  227. {"b": "hello1"},
  228. {"b": "hello2"},
  229. },
  230. },
  231. },
  232. },
  233. {
  234. stmt: &xsql.StreamStmt{
  235. Name: xsql.StreamName("demo"),
  236. StreamFields: []xsql.StreamField{
  237. {Name: "a", FieldType: &xsql.ArrayType{
  238. Type: xsql.STRUCT,
  239. FieldType: &xsql.RecType{
  240. StreamFields: []xsql.StreamField{
  241. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  242. },
  243. },
  244. }},
  245. },
  246. },
  247. data: []byte(`{"a": []}`),
  248. result: &xsql.Tuple{Message: xsql.Message{
  249. "a": make([]map[string]interface{}, 0),
  250. },
  251. },
  252. },
  253. {
  254. stmt: &xsql.StreamStmt{
  255. Name: xsql.StreamName("demo"),
  256. StreamFields: []xsql.StreamField{
  257. {Name: "a", FieldType: &xsql.ArrayType{
  258. Type: xsql.STRUCT,
  259. FieldType: &xsql.RecType{
  260. StreamFields: []xsql.StreamField{
  261. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  262. },
  263. },
  264. }},
  265. },
  266. },
  267. data: []byte(`{"a": null}`),
  268. result: &xsql.Tuple{Message: xsql.Message{
  269. "a": []map[string]interface{}(nil),
  270. },
  271. },
  272. },
  273. {
  274. stmt: &xsql.StreamStmt{
  275. Name: xsql.StreamName("demo"),
  276. StreamFields: []xsql.StreamField{
  277. {Name: "a", FieldType: &xsql.ArrayType{
  278. Type: xsql.STRUCT,
  279. FieldType: &xsql.RecType{
  280. StreamFields: []xsql.StreamField{
  281. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  282. },
  283. },
  284. }},
  285. },
  286. },
  287. data: []byte(`{"a": [null, {"b" : "hello2"}]}`),
  288. result: &xsql.Tuple{Message: xsql.Message{
  289. "a": []map[string]interface{}{
  290. nil,
  291. {"b": "hello2"},
  292. },
  293. },
  294. },
  295. },
  296. {
  297. stmt: &xsql.StreamStmt{
  298. Name: xsql.StreamName("demo"),
  299. StreamFields: []xsql.StreamField{
  300. {Name: "a", FieldType: &xsql.ArrayType{
  301. Type: xsql.ARRAY,
  302. FieldType: &xsql.ArrayType{
  303. Type: xsql.BIGINT,
  304. },
  305. }},
  306. },
  307. },
  308. data: []byte(`{"a": [[50, 60, 70],[66], [77]]}`),
  309. result: &xsql.Tuple{Message: xsql.Message{
  310. "a": [][]int{
  311. {50, 60, 70},
  312. {66},
  313. {77},
  314. },
  315. },
  316. },
  317. },
  318. {
  319. stmt: &xsql.StreamStmt{
  320. Name: xsql.StreamName("demo"),
  321. StreamFields: []xsql.StreamField{
  322. {Name: "a", FieldType: &xsql.ArrayType{
  323. Type: xsql.ARRAY,
  324. FieldType: &xsql.ArrayType{
  325. Type: xsql.BIGINT,
  326. },
  327. }},
  328. },
  329. },
  330. data: []byte(`{"a": [null, [66], [77]]}`),
  331. result: &xsql.Tuple{Message: xsql.Message{
  332. "a": [][]int{
  333. []int(nil),
  334. {66},
  335. {77},
  336. },
  337. },
  338. },
  339. },
  340. {
  341. stmt: &xsql.StreamStmt{
  342. Name: xsql.StreamName("demo"),
  343. StreamFields: nil,
  344. },
  345. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  346. result: &xsql.Tuple{Message: xsql.Message{
  347. "a": []interface{}{
  348. map[string]interface{}{"b": "hello1"},
  349. map[string]interface{}{"b": "hello2"},
  350. },
  351. },
  352. },
  353. },
  354. {
  355. stmt: &xsql.StreamStmt{
  356. Name: xsql.StreamName("demo"),
  357. StreamFields: []xsql.StreamField{
  358. {Name: "a", FieldType: &xsql.ArrayType{
  359. Type: xsql.FLOAT,
  360. }},
  361. },
  362. },
  363. data: []byte(`{"a": "[\"55\", \"77\"]"}`),
  364. result: &xsql.Tuple{Message: xsql.Message{
  365. "a": []float64{
  366. 55,
  367. 77,
  368. },
  369. },
  370. },
  371. },
  372. {
  373. stmt: &xsql.StreamStmt{
  374. Name: xsql.StreamName("demo"),
  375. StreamFields: nil,
  376. },
  377. data: []byte(`{"a": [55, 77]}`),
  378. result: &xsql.Tuple{Message: xsql.Message{
  379. "a": []interface{}{
  380. float64(55),
  381. float64(77),
  382. },
  383. },
  384. },
  385. },
  386. //Rec of complex type
  387. {
  388. stmt: &xsql.StreamStmt{
  389. Name: xsql.StreamName("demo"),
  390. StreamFields: []xsql.StreamField{
  391. {Name: "a", FieldType: &xsql.RecType{
  392. StreamFields: []xsql.StreamField{
  393. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  394. {Name: "c", FieldType: &xsql.RecType{
  395. StreamFields: []xsql.StreamField{
  396. {Name: "d", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  397. },
  398. }},
  399. },
  400. }},
  401. },
  402. },
  403. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  404. result: &xsql.Tuple{Message: xsql.Message{
  405. "a": map[string]interface{}{
  406. "b": "hello",
  407. "c": map[string]interface{}{
  408. "d": int(35),
  409. },
  410. },
  411. },
  412. },
  413. },
  414. {
  415. stmt: &xsql.StreamStmt{
  416. Name: xsql.StreamName("demo"),
  417. StreamFields: []xsql.StreamField{
  418. {Name: "a", FieldType: &xsql.RecType{
  419. StreamFields: []xsql.StreamField{
  420. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  421. {Name: "c", FieldType: &xsql.RecType{
  422. StreamFields: []xsql.StreamField{
  423. {Name: "d", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  424. },
  425. }},
  426. },
  427. }},
  428. },
  429. },
  430. data: []byte(`{"a": null}`),
  431. result: &xsql.Tuple{Message: xsql.Message{
  432. "a": map[string]interface{}(nil),
  433. },
  434. },
  435. },
  436. {
  437. stmt: &xsql.StreamStmt{
  438. Name: xsql.StreamName("demo"),
  439. StreamFields: []xsql.StreamField{
  440. {Name: "a", FieldType: &xsql.RecType{
  441. StreamFields: []xsql.StreamField{
  442. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  443. {Name: "c", FieldType: &xsql.ArrayType{
  444. Type: xsql.FLOAT,
  445. }},
  446. },
  447. }},
  448. },
  449. },
  450. data: []byte(`{"a": {"b" : "hello", "c": [35.2, 38.2]}}`),
  451. result: &xsql.Tuple{Message: xsql.Message{
  452. "a": map[string]interface{}{
  453. "b": "hello",
  454. "c": []float64{
  455. 35.2, 38.2,
  456. },
  457. },
  458. },
  459. },
  460. },
  461. {
  462. stmt: &xsql.StreamStmt{
  463. Name: xsql.StreamName("demo"),
  464. StreamFields: []xsql.StreamField{
  465. {Name: "a", FieldType: &xsql.RecType{
  466. StreamFields: []xsql.StreamField{
  467. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  468. {Name: "c", FieldType: &xsql.ArrayType{
  469. Type: xsql.FLOAT,
  470. }},
  471. },
  472. }},
  473. },
  474. },
  475. data: []byte(`{"a": {"b" : "hello", "c": null}}`),
  476. result: &xsql.Tuple{Message: xsql.Message{
  477. "a": map[string]interface{}{
  478. "b": "hello",
  479. "c": []float64(nil),
  480. },
  481. },
  482. },
  483. },
  484. {
  485. stmt: &xsql.StreamStmt{
  486. Name: xsql.StreamName("demo"),
  487. StreamFields: []xsql.StreamField{
  488. {Name: "a", FieldType: &xsql.RecType{
  489. StreamFields: []xsql.StreamField{
  490. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  491. {Name: "c", FieldType: &xsql.ArrayType{
  492. Type: xsql.FLOAT,
  493. }},
  494. },
  495. }},
  496. },
  497. },
  498. data: []byte(`{"a": {"b" : "hello", "c": [null, 35.4]}}`),
  499. result: errors.New("error in preprocessor: fail to parse field c: invalid data type for [0], expect float but found <nil>(<nil>)"),
  500. },
  501. {
  502. stmt: &xsql.StreamStmt{
  503. Name: xsql.StreamName("demo"),
  504. StreamFields: nil,
  505. },
  506. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  507. result: &xsql.Tuple{Message: xsql.Message{
  508. "a": map[string]interface{}{
  509. "b": "hello",
  510. "c": map[string]interface{}{
  511. "d": 35.2,
  512. },
  513. },
  514. },
  515. },
  516. },
  517. }
  518. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  519. defer common.CloseLogger()
  520. contextLogger := common.Log.WithField("rule", "TestPreprocessor_Apply")
  521. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  522. for i, tt := range tests {
  523. pp := &Preprocessor{streamFields: convertFields(tt.stmt.StreamFields)}
  524. dm := make(map[string]interface{})
  525. if e := json.Unmarshal(tt.data, &dm); e != nil {
  526. log.Fatal(e)
  527. return
  528. } else {
  529. tuple := &xsql.Tuple{Message: dm}
  530. fv, afv := xsql.NewFunctionValuersForOp(nil)
  531. result := pp.Apply(ctx, tuple, fv, afv)
  532. if !reflect.DeepEqual(tt.result, result) {
  533. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  534. }
  535. }
  536. }
  537. }
  538. func TestPreprocessorTime_Apply(t *testing.T) {
  539. var tests = []struct {
  540. stmt *xsql.StreamStmt
  541. data []byte
  542. result interface{}
  543. }{
  544. {
  545. stmt: &xsql.StreamStmt{
  546. Name: xsql.StreamName("demo"),
  547. StreamFields: []xsql.StreamField{
  548. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  549. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  550. },
  551. },
  552. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  553. result: &xsql.Tuple{Message: xsql.Message{
  554. "abc": common.TimeFromUnixMilli(1568854515000),
  555. "def": common.TimeFromUnixMilli(1568854573431),
  556. },
  557. },
  558. },
  559. {
  560. stmt: &xsql.StreamStmt{
  561. Name: xsql.StreamName("demo"),
  562. StreamFields: nil,
  563. },
  564. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  565. result: &xsql.Tuple{Message: xsql.Message{
  566. "abc": "2019-09-19T00:55:15.000Z",
  567. "def": float64(1568854573431),
  568. },
  569. },
  570. },
  571. {
  572. stmt: &xsql.StreamStmt{
  573. Name: xsql.StreamName("demo"),
  574. StreamFields: []xsql.StreamField{
  575. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  576. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  577. },
  578. },
  579. data: []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
  580. result: errors.New("error in preprocessor: invalid data type for abc, cannot convert to datetime: parsing time \"2019-09-19T00:55:1dd5Z\" as \"2006-01-02T15:04:05.000Z07:00\": cannot parse \"1dd5Z\" as \"05\""),
  581. },
  582. {
  583. stmt: &xsql.StreamStmt{
  584. Name: xsql.StreamName("demo"),
  585. StreamFields: []xsql.StreamField{
  586. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  587. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  588. },
  589. Options: map[string]string{
  590. "DATASOURCE": "users",
  591. "FORMAT": "JSON",
  592. "KEY": "USERID",
  593. "CONF_KEY": "srv1",
  594. "TYPE": "MQTT",
  595. "TIMESTAMP": "USERID",
  596. "TIMESTAMP_FORMAT": "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
  597. },
  598. },
  599. data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
  600. result: &xsql.Tuple{Message: xsql.Message{
  601. "abc": common.TimeFromUnixMilli(1568894115000),
  602. "def": common.TimeFromUnixMilli(1568854573431),
  603. }},
  604. },
  605. //Array type
  606. {
  607. stmt: &xsql.StreamStmt{
  608. Name: xsql.StreamName("demo"),
  609. StreamFields: []xsql.StreamField{
  610. {Name: "a", FieldType: &xsql.ArrayType{
  611. Type: xsql.DATETIME,
  612. }},
  613. },
  614. },
  615. data: []byte(`{"a": [1568854515123, 1568854573431]}`),
  616. result: &xsql.Tuple{Message: xsql.Message{
  617. "a": []time.Time{
  618. common.TimeFromUnixMilli(1568854515123),
  619. common.TimeFromUnixMilli(1568854573431),
  620. },
  621. },
  622. },
  623. },
  624. {
  625. stmt: &xsql.StreamStmt{
  626. Name: xsql.StreamName("demo"),
  627. StreamFields: []xsql.StreamField{
  628. {Name: "a", FieldType: &xsql.RecType{
  629. StreamFields: []xsql.StreamField{
  630. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  631. {Name: "c", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  632. },
  633. }},
  634. },
  635. },
  636. data: []byte(`{"a": {"b" : "hello", "c": 1568854515000}}`),
  637. result: &xsql.Tuple{Message: xsql.Message{
  638. "a": map[string]interface{}{
  639. "b": "hello",
  640. "c": common.TimeFromUnixMilli(1568854515000),
  641. },
  642. },
  643. },
  644. },
  645. }
  646. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  647. defer common.CloseLogger()
  648. contextLogger := common.Log.WithField("rule", "TestPreprocessorTime_Apply")
  649. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  650. for i, tt := range tests {
  651. pp := &Preprocessor{streamFields: convertFields(tt.stmt.StreamFields), timestampFormat: tt.stmt.Options["TIMESTAMP_FORMAT"]}
  652. dm := make(map[string]interface{})
  653. if e := json.Unmarshal(tt.data, &dm); e != nil {
  654. log.Fatal(e)
  655. return
  656. } else {
  657. tuple := &xsql.Tuple{Message: dm}
  658. fv, afv := xsql.NewFunctionValuersForOp(nil)
  659. result := pp.Apply(ctx, tuple, fv, afv)
  660. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  661. if rt, ok := result.(*xsql.Tuple); ok {
  662. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  663. rt.Message["abc"] = rtt.UTC()
  664. }
  665. }
  666. if !reflect.DeepEqual(tt.result, result) {
  667. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  668. }
  669. }
  670. }
  671. }
  672. func convertFields(o xsql.StreamFields) []interface{} {
  673. if o == nil {
  674. return nil
  675. }
  676. fields := make([]interface{}, len(o))
  677. for i, _ := range o {
  678. fields[i] = &o[i]
  679. }
  680. return fields
  681. }
  682. func TestPreprocessorEventtime_Apply(t *testing.T) {
  683. var tests = []struct {
  684. stmt *xsql.StreamStmt
  685. data []byte
  686. result interface{}
  687. }{
  688. //Basic type
  689. {
  690. stmt: &xsql.StreamStmt{
  691. Name: xsql.StreamName("demo"),
  692. StreamFields: []xsql.StreamField{
  693. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  694. },
  695. Options: map[string]string{
  696. "DATASOURCE": "users",
  697. "FORMAT": "JSON",
  698. "KEY": "USERID",
  699. "CONF_KEY": "srv1",
  700. "TYPE": "MQTT",
  701. "TIMESTAMP": "abc",
  702. "TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
  703. },
  704. },
  705. data: []byte(`{"abc": 1568854515000}`),
  706. result: &xsql.Tuple{Message: xsql.Message{
  707. "abc": int(1568854515000),
  708. }, Timestamp: 1568854515000,
  709. },
  710. },
  711. {
  712. stmt: &xsql.StreamStmt{
  713. Name: xsql.StreamName("demo"),
  714. StreamFields: nil,
  715. Options: map[string]string{
  716. "DATASOURCE": "users",
  717. "FORMAT": "JSON",
  718. "KEY": "USERID",
  719. "CONF_KEY": "srv1",
  720. "TYPE": "MQTT",
  721. "TIMESTAMP": "abc",
  722. "TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
  723. },
  724. },
  725. data: []byte(`{"abc": 1568854515000}`),
  726. result: &xsql.Tuple{Message: xsql.Message{
  727. "abc": float64(1568854515000),
  728. }, Timestamp: 1568854515000,
  729. },
  730. },
  731. {
  732. stmt: &xsql.StreamStmt{
  733. Name: xsql.StreamName("demo"),
  734. StreamFields: []xsql.StreamField{
  735. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  736. },
  737. Options: map[string]string{
  738. "DATASOURCE": "users",
  739. "TIMESTAMP": "abc",
  740. },
  741. },
  742. data: []byte(`{"abc": true}`),
  743. result: errors.New("cannot convert timestamp field abc to timestamp with error unsupported type to convert to timestamp true"),
  744. },
  745. {
  746. stmt: &xsql.StreamStmt{
  747. Name: xsql.StreamName("demo"),
  748. StreamFields: []xsql.StreamField{
  749. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  750. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  751. },
  752. Options: map[string]string{
  753. "DATASOURCE": "users",
  754. "TIMESTAMP": "def",
  755. },
  756. },
  757. data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
  758. result: &xsql.Tuple{Message: xsql.Message{
  759. "abc": float64(34),
  760. "def": "2019-09-23T02:47:29.754Z",
  761. }, Timestamp: int64(1569206849754),
  762. },
  763. },
  764. {
  765. stmt: &xsql.StreamStmt{
  766. Name: xsql.StreamName("demo"),
  767. StreamFields: []xsql.StreamField{
  768. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  769. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  770. },
  771. Options: map[string]string{
  772. "DATASOURCE": "users",
  773. "TIMESTAMP": "abc",
  774. },
  775. },
  776. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  777. result: &xsql.Tuple{Message: xsql.Message{
  778. "abc": common.TimeFromUnixMilli(1568854515000),
  779. "def": common.TimeFromUnixMilli(1568854573431),
  780. }, Timestamp: int64(1568854515000),
  781. },
  782. },
  783. {
  784. stmt: &xsql.StreamStmt{
  785. Name: xsql.StreamName("demo"),
  786. StreamFields: []xsql.StreamField{
  787. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  788. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  789. },
  790. Options: map[string]string{
  791. "DATASOURCE": "users",
  792. "TIMESTAMP": "def",
  793. "TIMESTAMP_FORMAT": "yyyy-MM-dd'AT'HH:mm:ss",
  794. },
  795. },
  796. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  797. result: &xsql.Tuple{Message: xsql.Message{
  798. "abc": float64(34),
  799. "def": "2019-09-23AT02:47:29",
  800. }, Timestamp: int64(1569206849000),
  801. },
  802. },
  803. {
  804. stmt: &xsql.StreamStmt{
  805. Name: xsql.StreamName("demo"),
  806. StreamFields: []xsql.StreamField{
  807. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  808. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  809. },
  810. Options: map[string]string{
  811. "DATASOURCE": "users",
  812. "TIMESTAMP": "def",
  813. "TIMESTAMP_FORMAT": "yyyy-MM-ddaHH:mm:ss",
  814. },
  815. },
  816. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  817. result: errors.New("cannot convert timestamp field def to timestamp with error parsing time \"2019-09-23AT02:47:29\" as \"2006-01-02PM15:04:05\": cannot parse \"02:47:29\" as \"PM\""),
  818. },
  819. }
  820. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  821. defer common.CloseLogger()
  822. contextLogger := common.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
  823. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  824. for i, tt := range tests {
  825. pp := &Preprocessor{
  826. streamFields: convertFields(tt.stmt.StreamFields),
  827. aliasFields: nil,
  828. isEventTime: true,
  829. timestampField: tt.stmt.Options["TIMESTAMP"],
  830. timestampFormat: tt.stmt.Options["TIMESTAMP_FORMAT"],
  831. isBinary: false,
  832. }
  833. dm := make(map[string]interface{})
  834. if e := json.Unmarshal(tt.data, &dm); e != nil {
  835. log.Fatal(e)
  836. return
  837. } else {
  838. tuple := &xsql.Tuple{Message: dm}
  839. fv, afv := xsql.NewFunctionValuersForOp(nil)
  840. result := pp.Apply(ctx, tuple, fv, afv)
  841. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  842. if rt, ok := result.(*xsql.Tuple); ok {
  843. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  844. rt.Message["abc"] = rtt.UTC()
  845. }
  846. }
  847. if !reflect.DeepEqual(tt.result, result) {
  848. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  849. }
  850. }
  851. }
  852. }
  853. func TestPreprocessorError(t *testing.T) {
  854. tests := []struct {
  855. stmt *xsql.StreamStmt
  856. data []byte
  857. result interface{}
  858. }{
  859. {
  860. stmt: &xsql.StreamStmt{
  861. Name: xsql.StreamName("demo"),
  862. StreamFields: []xsql.StreamField{
  863. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  864. },
  865. },
  866. data: []byte(`{"abc": "dafsad"}`),
  867. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(dafsad)"),
  868. }, {
  869. stmt: &xsql.StreamStmt{
  870. Name: xsql.StreamName("demo"),
  871. StreamFields: []xsql.StreamField{
  872. {Name: "a", FieldType: &xsql.RecType{
  873. StreamFields: []xsql.StreamField{
  874. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  875. },
  876. }},
  877. },
  878. },
  879. data: []byte(`{"a": {"d" : "hello"}}`),
  880. result: errors.New("error in preprocessor: invalid data map[d:hello], field b not found"),
  881. }, {
  882. stmt: &xsql.StreamStmt{
  883. Name: xsql.StreamName("demo"),
  884. StreamFields: []xsql.StreamField{
  885. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  886. },
  887. Options: map[string]string{
  888. "DATASOURCE": "users",
  889. "FORMAT": "JSON",
  890. "KEY": "USERID",
  891. "CONF_KEY": "srv1",
  892. "TYPE": "MQTT",
  893. "TIMESTAMP": "abc",
  894. "TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
  895. },
  896. },
  897. data: []byte(`{"abc": "not a time"}`),
  898. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(not a time)"),
  899. },
  900. }
  901. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  902. defer common.CloseLogger()
  903. contextLogger := common.Log.WithField("rule", "TestPreprocessorError")
  904. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  905. for i, tt := range tests {
  906. pp := &Preprocessor{streamFields: convertFields(tt.stmt.StreamFields)}
  907. dm := make(map[string]interface{})
  908. if e := json.Unmarshal(tt.data, &dm); e != nil {
  909. log.Fatal(e)
  910. return
  911. } else {
  912. tuple := &xsql.Tuple{Message: dm}
  913. fv, afv := xsql.NewFunctionValuersForOp(nil)
  914. result := pp.Apply(ctx, tuple, fv, afv)
  915. if !reflect.DeepEqual(tt.result, result) {
  916. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  917. }
  918. }
  919. }
  920. }
  921. func TestPreprocessorForBinary(t *testing.T) {
  922. docsFolder, err := common.GetLoc("/docs/")
  923. if err != nil {
  924. t.Errorf("Cannot find docs folder: %v", err)
  925. }
  926. image, err := ioutil.ReadFile(path.Join(docsFolder, "cover.jpg"))
  927. if err != nil {
  928. t.Errorf("Cannot read image: %v", err)
  929. }
  930. b64img := base64.StdEncoding.EncodeToString(image)
  931. //TODO test bytea type conversion to string or else
  932. var tests = []struct {
  933. stmt *xsql.StreamStmt
  934. data []byte
  935. isBinary bool
  936. result interface{}
  937. }{
  938. {
  939. stmt: &xsql.StreamStmt{
  940. Name: xsql.StreamName("demo"),
  941. StreamFields: nil,
  942. },
  943. data: image,
  944. isBinary: true,
  945. result: &xsql.Tuple{Message: xsql.Message{
  946. "self": image,
  947. },
  948. },
  949. },
  950. {
  951. stmt: &xsql.StreamStmt{
  952. Name: xsql.StreamName("demo"),
  953. StreamFields: []xsql.StreamField{
  954. {Name: "img", FieldType: &xsql.BasicType{Type: xsql.BYTEA}},
  955. },
  956. },
  957. data: image,
  958. isBinary: true,
  959. result: &xsql.Tuple{Message: xsql.Message{
  960. "img": image,
  961. },
  962. },
  963. },
  964. {
  965. stmt: &xsql.StreamStmt{
  966. Name: xsql.StreamName("demo"),
  967. StreamFields: []xsql.StreamField{
  968. {Name: "a", FieldType: &xsql.RecType{
  969. StreamFields: []xsql.StreamField{
  970. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.BYTEA}},
  971. },
  972. }},
  973. },
  974. },
  975. data: []byte(fmt.Sprintf(`{"a": {"b" : "%s"}}`, b64img)),
  976. result: &xsql.Tuple{Message: xsql.Message{
  977. "a": map[string]interface{}{
  978. "b": image,
  979. },
  980. },
  981. },
  982. },
  983. {
  984. stmt: &xsql.StreamStmt{
  985. Name: xsql.StreamName("demo"),
  986. StreamFields: []xsql.StreamField{
  987. {Name: "a", FieldType: &xsql.ArrayType{
  988. Type: xsql.BYTEA,
  989. }},
  990. },
  991. },
  992. data: []byte(fmt.Sprintf(`{"a": ["%s"]}`, b64img)),
  993. result: &xsql.Tuple{Message: xsql.Message{
  994. "a": [][]byte{
  995. image,
  996. },
  997. },
  998. },
  999. },
  1000. {
  1001. stmt: &xsql.StreamStmt{
  1002. Name: xsql.StreamName("demo"),
  1003. StreamFields: []xsql.StreamField{
  1004. {Name: "a", FieldType: &xsql.ArrayType{
  1005. Type: xsql.STRUCT,
  1006. FieldType: &xsql.RecType{
  1007. StreamFields: []xsql.StreamField{
  1008. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.BYTEA}},
  1009. },
  1010. },
  1011. }},
  1012. },
  1013. },
  1014. data: []byte(fmt.Sprintf(`{"a": [{"b":"%s"}]}`, b64img)),
  1015. result: &xsql.Tuple{Message: xsql.Message{
  1016. "a": []map[string]interface{}{
  1017. {"b": image},
  1018. },
  1019. },
  1020. },
  1021. },
  1022. }
  1023. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1024. defer common.CloseLogger()
  1025. contextLogger := common.Log.WithField("rule", "TestPreprocessorForBinary")
  1026. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1027. for i, tt := range tests {
  1028. pp := &Preprocessor{streamFields: convertFields(tt.stmt.StreamFields), isBinary: tt.isBinary}
  1029. format := "json"
  1030. if tt.isBinary {
  1031. format = "binary"
  1032. }
  1033. if dm, e := common.MessageDecode(tt.data, format); e != nil {
  1034. log.Fatal(e)
  1035. return
  1036. } else {
  1037. tuple := &xsql.Tuple{Message: dm}
  1038. fv, afv := xsql.NewFunctionValuersForOp(nil)
  1039. result := pp.Apply(ctx, tuple, fv, afv)
  1040. if !reflect.DeepEqual(tt.result, result) {
  1041. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  1042. }
  1043. }
  1044. }
  1045. }