preprocessor_test.go 28 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082
  1. package operators
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/emqx/kuiper/common"
  8. "github.com/emqx/kuiper/xsql"
  9. "github.com/emqx/kuiper/xstream/contexts"
  10. "io/ioutil"
  11. "log"
  12. "path"
  13. "reflect"
  14. "testing"
  15. "time"
  16. )
  17. func TestPreprocessor_Apply(t *testing.T) {
  18. var tests = []struct {
  19. stmt *xsql.StreamStmt
  20. data []byte
  21. result interface{}
  22. }{
  23. //Basic type
  24. {
  25. stmt: &xsql.StreamStmt{
  26. Name: xsql.StreamName("demo"),
  27. StreamFields: []xsql.StreamField{
  28. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  29. },
  30. },
  31. data: []byte(`{"a": 6}`),
  32. result: errors.New("error in preprocessor: invalid data map[a:%!s(float64=6)], field abc not found"),
  33. },
  34. {
  35. stmt: &xsql.StreamStmt{
  36. Name: xsql.StreamName("demo"),
  37. StreamFields: []xsql.StreamField{
  38. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  39. },
  40. },
  41. data: []byte(`{"abc": null}`),
  42. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found <nil>(<nil>)"),
  43. },
  44. {
  45. stmt: &xsql.StreamStmt{
  46. Name: xsql.StreamName("demo"),
  47. StreamFields: nil,
  48. },
  49. data: []byte(`{"a": 6}`),
  50. result: &xsql.Tuple{Message: xsql.Message{
  51. "a": float64(6),
  52. },
  53. },
  54. },
  55. {
  56. stmt: &xsql.StreamStmt{
  57. Name: xsql.StreamName("demo"),
  58. StreamFields: []xsql.StreamField{
  59. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  60. },
  61. },
  62. data: []byte(`{"abc": 6}`),
  63. result: &xsql.Tuple{Message: xsql.Message{
  64. "abc": 6,
  65. },
  66. },
  67. },
  68. {
  69. stmt: &xsql.StreamStmt{
  70. Name: xsql.StreamName("demo"),
  71. StreamFields: nil,
  72. },
  73. data: []byte(`{"abc": 6}`),
  74. result: &xsql.Tuple{Message: xsql.Message{
  75. "abc": float64(6),
  76. },
  77. },
  78. },
  79. {
  80. stmt: &xsql.StreamStmt{
  81. Name: xsql.StreamName("demo"),
  82. StreamFields: []xsql.StreamField{
  83. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  84. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  85. },
  86. },
  87. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  88. result: &xsql.Tuple{Message: xsql.Message{
  89. "abc": float64(34),
  90. "def": "hello",
  91. },
  92. },
  93. },
  94. {
  95. stmt: &xsql.StreamStmt{
  96. Name: xsql.StreamName("demo"),
  97. StreamFields: nil,
  98. },
  99. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  100. result: &xsql.Tuple{Message: xsql.Message{
  101. "abc": float64(34),
  102. "def": "hello",
  103. "ghi": float64(50),
  104. },
  105. },
  106. },
  107. {
  108. stmt: &xsql.StreamStmt{
  109. Name: xsql.StreamName("demo"),
  110. StreamFields: []xsql.StreamField{
  111. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  112. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  113. },
  114. },
  115. data: []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
  116. result: &xsql.Tuple{Message: xsql.Message{
  117. "abc": float64(34),
  118. "def": "hello",
  119. },
  120. },
  121. },
  122. {
  123. stmt: &xsql.StreamStmt{
  124. Name: xsql.StreamName("demo"),
  125. StreamFields: []xsql.StreamField{
  126. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  127. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  128. },
  129. },
  130. data: []byte(`{"abc": 77, "def" : "hello"}`),
  131. result: errors.New("error in preprocessor: invalid data type for def, expect boolean but found string(hello)"),
  132. },
  133. {
  134. stmt: &xsql.StreamStmt{
  135. Name: xsql.StreamName("demo"),
  136. StreamFields: []xsql.StreamField{
  137. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  138. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  139. },
  140. },
  141. data: []byte(`{"a": {"b" : "hello"}}`),
  142. result: errors.New("error in preprocessor: invalid data map[a:map[b:hello]], field abc not found"),
  143. },
  144. {
  145. stmt: &xsql.StreamStmt{
  146. Name: xsql.StreamName("demo"),
  147. StreamFields: nil,
  148. },
  149. data: []byte(`{"a": {"b" : "hello"}}`),
  150. result: &xsql.Tuple{Message: xsql.Message{
  151. "a": map[string]interface{}{
  152. "b": "hello",
  153. },
  154. },
  155. },
  156. },
  157. //Rec type
  158. {
  159. stmt: &xsql.StreamStmt{
  160. Name: xsql.StreamName("demo"),
  161. StreamFields: []xsql.StreamField{
  162. {Name: "a", FieldType: &xsql.RecType{
  163. StreamFields: []xsql.StreamField{
  164. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  165. },
  166. }},
  167. },
  168. },
  169. data: []byte(`{"a": {"b" : "hello"}}`),
  170. result: &xsql.Tuple{Message: xsql.Message{
  171. "a": map[string]interface{}{
  172. "b": "hello",
  173. },
  174. },
  175. },
  176. },
  177. {
  178. stmt: &xsql.StreamStmt{
  179. Name: xsql.StreamName("demo"),
  180. StreamFields: []xsql.StreamField{
  181. {Name: "a", FieldType: &xsql.RecType{
  182. StreamFields: []xsql.StreamField{
  183. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  184. },
  185. }},
  186. },
  187. },
  188. data: []byte(`{"a": "{\"b\" : \"32\"}"}`),
  189. result: &xsql.Tuple{Message: xsql.Message{
  190. "a": map[string]interface{}{
  191. "b": float64(32),
  192. },
  193. },
  194. },
  195. },
  196. {
  197. stmt: &xsql.StreamStmt{
  198. Name: xsql.StreamName("demo"),
  199. StreamFields: nil,
  200. },
  201. data: []byte(`{"a": {"b" : "32"}}`),
  202. result: &xsql.Tuple{Message: xsql.Message{
  203. "a": map[string]interface{}{
  204. "b": "32",
  205. },
  206. },
  207. },
  208. },
  209. //Array of complex type
  210. {
  211. stmt: &xsql.StreamStmt{
  212. Name: xsql.StreamName("demo"),
  213. StreamFields: []xsql.StreamField{
  214. {Name: "a", FieldType: &xsql.ArrayType{
  215. Type: xsql.STRUCT,
  216. FieldType: &xsql.RecType{
  217. StreamFields: []xsql.StreamField{
  218. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  219. },
  220. },
  221. }},
  222. },
  223. },
  224. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  225. result: &xsql.Tuple{Message: xsql.Message{
  226. "a": []map[string]interface{}{
  227. {"b": "hello1"},
  228. {"b": "hello2"},
  229. },
  230. },
  231. },
  232. },
  233. {
  234. stmt: &xsql.StreamStmt{
  235. Name: xsql.StreamName("demo"),
  236. StreamFields: []xsql.StreamField{
  237. {Name: "a", FieldType: &xsql.ArrayType{
  238. Type: xsql.STRUCT,
  239. FieldType: &xsql.RecType{
  240. StreamFields: []xsql.StreamField{
  241. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  242. },
  243. },
  244. }},
  245. },
  246. },
  247. data: []byte(`{"a": []}`),
  248. result: &xsql.Tuple{Message: xsql.Message{
  249. "a": make([]map[string]interface{}, 0),
  250. },
  251. },
  252. },
  253. {
  254. stmt: &xsql.StreamStmt{
  255. Name: xsql.StreamName("demo"),
  256. StreamFields: []xsql.StreamField{
  257. {Name: "a", FieldType: &xsql.ArrayType{
  258. Type: xsql.STRUCT,
  259. FieldType: &xsql.RecType{
  260. StreamFields: []xsql.StreamField{
  261. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  262. },
  263. },
  264. }},
  265. },
  266. },
  267. data: []byte(`{"a": null}`),
  268. result: &xsql.Tuple{Message: xsql.Message{
  269. "a": []map[string]interface{}(nil),
  270. },
  271. },
  272. },
  273. {
  274. stmt: &xsql.StreamStmt{
  275. Name: xsql.StreamName("demo"),
  276. StreamFields: []xsql.StreamField{
  277. {Name: "a", FieldType: &xsql.ArrayType{
  278. Type: xsql.STRUCT,
  279. FieldType: &xsql.RecType{
  280. StreamFields: []xsql.StreamField{
  281. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  282. },
  283. },
  284. }},
  285. },
  286. },
  287. data: []byte(`{"a": [null, {"b" : "hello2"}]}`),
  288. result: &xsql.Tuple{Message: xsql.Message{
  289. "a": []map[string]interface{}{
  290. nil,
  291. {"b": "hello2"},
  292. },
  293. },
  294. },
  295. },
  296. {
  297. stmt: &xsql.StreamStmt{
  298. Name: xsql.StreamName("demo"),
  299. StreamFields: []xsql.StreamField{
  300. {Name: "a", FieldType: &xsql.ArrayType{
  301. Type: xsql.ARRAY,
  302. FieldType: &xsql.ArrayType{
  303. Type: xsql.BIGINT,
  304. },
  305. }},
  306. },
  307. },
  308. data: []byte(`{"a": [[50, 60, 70],[66], [77]]}`),
  309. result: &xsql.Tuple{Message: xsql.Message{
  310. "a": [][]int{
  311. {50, 60, 70},
  312. {66},
  313. {77},
  314. },
  315. },
  316. },
  317. },
  318. {
  319. stmt: &xsql.StreamStmt{
  320. Name: xsql.StreamName("demo"),
  321. StreamFields: []xsql.StreamField{
  322. {Name: "a", FieldType: &xsql.ArrayType{
  323. Type: xsql.ARRAY,
  324. FieldType: &xsql.ArrayType{
  325. Type: xsql.BIGINT,
  326. },
  327. }},
  328. },
  329. },
  330. data: []byte(`{"a": [null, [66], [77]]}`),
  331. result: &xsql.Tuple{Message: xsql.Message{
  332. "a": [][]int{
  333. []int(nil),
  334. {66},
  335. {77},
  336. },
  337. },
  338. },
  339. },
  340. {
  341. stmt: &xsql.StreamStmt{
  342. Name: xsql.StreamName("demo"),
  343. StreamFields: nil,
  344. },
  345. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  346. result: &xsql.Tuple{Message: xsql.Message{
  347. "a": []interface{}{
  348. map[string]interface{}{"b": "hello1"},
  349. map[string]interface{}{"b": "hello2"},
  350. },
  351. },
  352. },
  353. },
  354. {
  355. stmt: &xsql.StreamStmt{
  356. Name: xsql.StreamName("demo"),
  357. StreamFields: []xsql.StreamField{
  358. {Name: "a", FieldType: &xsql.ArrayType{
  359. Type: xsql.FLOAT,
  360. }},
  361. },
  362. },
  363. data: []byte(`{"a": "[\"55\", \"77\"]"}`),
  364. result: &xsql.Tuple{Message: xsql.Message{
  365. "a": []float64{
  366. 55,
  367. 77,
  368. },
  369. },
  370. },
  371. },
  372. {
  373. stmt: &xsql.StreamStmt{
  374. Name: xsql.StreamName("demo"),
  375. StreamFields: nil,
  376. },
  377. data: []byte(`{"a": [55, 77]}`),
  378. result: &xsql.Tuple{Message: xsql.Message{
  379. "a": []interface{}{
  380. float64(55),
  381. float64(77),
  382. },
  383. },
  384. },
  385. },
  386. //Rec of complex type
  387. {
  388. stmt: &xsql.StreamStmt{
  389. Name: xsql.StreamName("demo"),
  390. StreamFields: []xsql.StreamField{
  391. {Name: "a", FieldType: &xsql.RecType{
  392. StreamFields: []xsql.StreamField{
  393. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  394. {Name: "c", FieldType: &xsql.RecType{
  395. StreamFields: []xsql.StreamField{
  396. {Name: "d", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  397. },
  398. }},
  399. },
  400. }},
  401. },
  402. },
  403. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  404. result: &xsql.Tuple{Message: xsql.Message{
  405. "a": map[string]interface{}{
  406. "b": "hello",
  407. "c": map[string]interface{}{
  408. "d": int(35),
  409. },
  410. },
  411. },
  412. },
  413. },
  414. {
  415. stmt: &xsql.StreamStmt{
  416. Name: xsql.StreamName("demo"),
  417. StreamFields: []xsql.StreamField{
  418. {Name: "a", FieldType: &xsql.RecType{
  419. StreamFields: []xsql.StreamField{
  420. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  421. {Name: "c", FieldType: &xsql.RecType{
  422. StreamFields: []xsql.StreamField{
  423. {Name: "d", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  424. },
  425. }},
  426. },
  427. }},
  428. },
  429. },
  430. data: []byte(`{"a": null}`),
  431. result: &xsql.Tuple{Message: xsql.Message{
  432. "a": map[string]interface{}(nil),
  433. },
  434. },
  435. },
  436. {
  437. stmt: &xsql.StreamStmt{
  438. Name: xsql.StreamName("demo"),
  439. StreamFields: []xsql.StreamField{
  440. {Name: "a", FieldType: &xsql.RecType{
  441. StreamFields: []xsql.StreamField{
  442. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  443. {Name: "c", FieldType: &xsql.ArrayType{
  444. Type: xsql.FLOAT,
  445. }},
  446. },
  447. }},
  448. },
  449. },
  450. data: []byte(`{"a": {"b" : "hello", "c": [35.2, 38.2]}}`),
  451. result: &xsql.Tuple{Message: xsql.Message{
  452. "a": map[string]interface{}{
  453. "b": "hello",
  454. "c": []float64{
  455. 35.2, 38.2,
  456. },
  457. },
  458. },
  459. },
  460. },
  461. {
  462. stmt: &xsql.StreamStmt{
  463. Name: xsql.StreamName("demo"),
  464. StreamFields: []xsql.StreamField{
  465. {Name: "a", FieldType: &xsql.RecType{
  466. StreamFields: []xsql.StreamField{
  467. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  468. {Name: "c", FieldType: &xsql.ArrayType{
  469. Type: xsql.FLOAT,
  470. }},
  471. },
  472. }},
  473. },
  474. },
  475. data: []byte(`{"a": {"b" : "hello", "c": null}}`),
  476. result: &xsql.Tuple{Message: xsql.Message{
  477. "a": map[string]interface{}{
  478. "b": "hello",
  479. "c": []float64(nil),
  480. },
  481. },
  482. },
  483. },
  484. {
  485. stmt: &xsql.StreamStmt{
  486. Name: xsql.StreamName("demo"),
  487. StreamFields: []xsql.StreamField{
  488. {Name: "a", FieldType: &xsql.RecType{
  489. StreamFields: []xsql.StreamField{
  490. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  491. {Name: "c", FieldType: &xsql.ArrayType{
  492. Type: xsql.FLOAT,
  493. }},
  494. },
  495. }},
  496. },
  497. },
  498. data: []byte(`{"a": {"b" : "hello", "c": [null, 35.4]}}`),
  499. result: errors.New("error in preprocessor: fail to parse field c: invalid data type for [0], expect float but found <nil>(<nil>)"),
  500. },
  501. {
  502. stmt: &xsql.StreamStmt{
  503. Name: xsql.StreamName("demo"),
  504. StreamFields: nil,
  505. },
  506. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  507. result: &xsql.Tuple{Message: xsql.Message{
  508. "a": map[string]interface{}{
  509. "b": "hello",
  510. "c": map[string]interface{}{
  511. "d": 35.2,
  512. },
  513. },
  514. },
  515. },
  516. },
  517. }
  518. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  519. defer common.CloseLogger()
  520. contextLogger := common.Log.WithField("rule", "TestPreprocessor_Apply")
  521. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  522. for i, tt := range tests {
  523. pp := &Preprocessor{}
  524. pp.streamFields = convertFields(tt.stmt.StreamFields)
  525. dm := make(map[string]interface{})
  526. if e := json.Unmarshal(tt.data, &dm); e != nil {
  527. log.Fatal(e)
  528. return
  529. } else {
  530. tuple := &xsql.Tuple{Message: dm}
  531. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  532. result := pp.Apply(ctx, tuple, fv, afv)
  533. if !reflect.DeepEqual(tt.result, result) {
  534. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  535. }
  536. }
  537. }
  538. }
  539. func TestPreprocessorTime_Apply(t *testing.T) {
  540. var tests = []struct {
  541. stmt *xsql.StreamStmt
  542. data []byte
  543. result interface{}
  544. }{
  545. {
  546. stmt: &xsql.StreamStmt{
  547. Name: xsql.StreamName("demo"),
  548. StreamFields: []xsql.StreamField{
  549. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  550. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  551. },
  552. },
  553. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  554. result: &xsql.Tuple{Message: xsql.Message{
  555. "abc": common.TimeFromUnixMilli(1568854515000),
  556. "def": common.TimeFromUnixMilli(1568854573431),
  557. },
  558. },
  559. },
  560. {
  561. stmt: &xsql.StreamStmt{
  562. Name: xsql.StreamName("demo"),
  563. StreamFields: nil,
  564. },
  565. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  566. result: &xsql.Tuple{Message: xsql.Message{
  567. "abc": "2019-09-19T00:55:15.000Z",
  568. "def": float64(1568854573431),
  569. },
  570. },
  571. },
  572. {
  573. stmt: &xsql.StreamStmt{
  574. Name: xsql.StreamName("demo"),
  575. StreamFields: []xsql.StreamField{
  576. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  577. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  578. },
  579. },
  580. data: []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
  581. result: errors.New("error in preprocessor: invalid data type for abc, cannot convert to datetime: parsing time \"2019-09-19T00:55:1dd5Z\" as \"2006-01-02T15:04:05.000Z07:00\": cannot parse \"1dd5Z\" as \"05\""),
  582. },
  583. {
  584. stmt: &xsql.StreamStmt{
  585. Name: xsql.StreamName("demo"),
  586. StreamFields: []xsql.StreamField{
  587. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  588. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  589. },
  590. Options: &xsql.Options{
  591. DATASOURCE: "users",
  592. FORMAT: "JSON",
  593. KEY: "USERID",
  594. CONF_KEY: "srv1",
  595. TYPE: "MQTT",
  596. TIMESTAMP: "USERID",
  597. TIMESTAMP_FORMAT: "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
  598. },
  599. },
  600. data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
  601. result: &xsql.Tuple{Message: xsql.Message{
  602. "abc": common.TimeFromUnixMilli(1568894115000),
  603. "def": common.TimeFromUnixMilli(1568854573431),
  604. }},
  605. },
  606. //Array type
  607. {
  608. stmt: &xsql.StreamStmt{
  609. Name: xsql.StreamName("demo"),
  610. StreamFields: []xsql.StreamField{
  611. {Name: "a", FieldType: &xsql.ArrayType{
  612. Type: xsql.DATETIME,
  613. }},
  614. },
  615. },
  616. data: []byte(`{"a": [1568854515123, 1568854573431]}`),
  617. result: &xsql.Tuple{Message: xsql.Message{
  618. "a": []time.Time{
  619. common.TimeFromUnixMilli(1568854515123),
  620. common.TimeFromUnixMilli(1568854573431),
  621. },
  622. },
  623. },
  624. },
  625. {
  626. stmt: &xsql.StreamStmt{
  627. Name: xsql.StreamName("demo"),
  628. StreamFields: []xsql.StreamField{
  629. {Name: "a", FieldType: &xsql.RecType{
  630. StreamFields: []xsql.StreamField{
  631. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  632. {Name: "c", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  633. },
  634. }},
  635. },
  636. },
  637. data: []byte(`{"a": {"b" : "hello", "c": 1568854515000}}`),
  638. result: &xsql.Tuple{Message: xsql.Message{
  639. "a": map[string]interface{}{
  640. "b": "hello",
  641. "c": common.TimeFromUnixMilli(1568854515000),
  642. },
  643. },
  644. },
  645. },
  646. }
  647. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  648. defer common.CloseLogger()
  649. contextLogger := common.Log.WithField("rule", "TestPreprocessorTime_Apply")
  650. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  651. for i, tt := range tests {
  652. pp := &Preprocessor{}
  653. pp.streamFields = convertFields(tt.stmt.StreamFields)
  654. if tt.stmt.Options != nil {
  655. pp.timestampFormat = tt.stmt.Options.TIMESTAMP_FORMAT
  656. }
  657. dm := make(map[string]interface{})
  658. if e := json.Unmarshal(tt.data, &dm); e != nil {
  659. log.Fatal(e)
  660. return
  661. } else {
  662. tuple := &xsql.Tuple{Message: dm}
  663. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  664. result := pp.Apply(ctx, tuple, fv, afv)
  665. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  666. if rt, ok := result.(*xsql.Tuple); ok {
  667. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  668. rt.Message["abc"] = rtt.UTC()
  669. }
  670. }
  671. if !reflect.DeepEqual(tt.result, result) {
  672. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  673. }
  674. }
  675. }
  676. }
  677. func convertFields(o xsql.StreamFields) []interface{} {
  678. if o == nil {
  679. return nil
  680. }
  681. fields := make([]interface{}, len(o))
  682. for i, _ := range o {
  683. fields[i] = &o[i]
  684. }
  685. return fields
  686. }
  687. func TestPreprocessorEventtime_Apply(t *testing.T) {
  688. var tests = []struct {
  689. stmt *xsql.StreamStmt
  690. data []byte
  691. result interface{}
  692. }{
  693. //Basic type
  694. {
  695. stmt: &xsql.StreamStmt{
  696. Name: xsql.StreamName("demo"),
  697. StreamFields: []xsql.StreamField{
  698. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  699. },
  700. Options: &xsql.Options{
  701. DATASOURCE: "users",
  702. FORMAT: "JSON",
  703. KEY: "USERID",
  704. CONF_KEY: "srv1",
  705. TYPE: "MQTT",
  706. TIMESTAMP: "abc",
  707. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  708. },
  709. },
  710. data: []byte(`{"abc": 1568854515000}`),
  711. result: &xsql.Tuple{Message: xsql.Message{
  712. "abc": int(1568854515000),
  713. }, Timestamp: 1568854515000,
  714. },
  715. },
  716. {
  717. stmt: &xsql.StreamStmt{
  718. Name: xsql.StreamName("demo"),
  719. StreamFields: nil,
  720. Options: &xsql.Options{
  721. DATASOURCE: "users",
  722. FORMAT: "JSON",
  723. KEY: "USERID",
  724. CONF_KEY: "srv1",
  725. TYPE: "MQTT",
  726. TIMESTAMP: "abc",
  727. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  728. },
  729. },
  730. data: []byte(`{"abc": 1568854515000}`),
  731. result: &xsql.Tuple{Message: xsql.Message{
  732. "abc": float64(1568854515000),
  733. }, Timestamp: 1568854515000,
  734. },
  735. },
  736. {
  737. stmt: &xsql.StreamStmt{
  738. Name: xsql.StreamName("demo"),
  739. StreamFields: []xsql.StreamField{
  740. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  741. },
  742. Options: &xsql.Options{
  743. DATASOURCE: "users",
  744. TIMESTAMP: "abc",
  745. },
  746. },
  747. data: []byte(`{"abc": true}`),
  748. result: errors.New("cannot convert timestamp field abc to timestamp with error unsupported type to convert to timestamp true"),
  749. },
  750. {
  751. stmt: &xsql.StreamStmt{
  752. Name: xsql.StreamName("demo"),
  753. StreamFields: []xsql.StreamField{
  754. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  755. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  756. },
  757. Options: &xsql.Options{
  758. DATASOURCE: "users",
  759. TIMESTAMP: "def",
  760. },
  761. },
  762. data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
  763. result: &xsql.Tuple{Message: xsql.Message{
  764. "abc": float64(34),
  765. "def": "2019-09-23T02:47:29.754Z",
  766. }, Timestamp: int64(1569206849754),
  767. },
  768. },
  769. {
  770. stmt: &xsql.StreamStmt{
  771. Name: xsql.StreamName("demo"),
  772. StreamFields: []xsql.StreamField{
  773. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  774. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
  775. },
  776. Options: &xsql.Options{
  777. DATASOURCE: "users",
  778. TIMESTAMP: "abc",
  779. },
  780. },
  781. data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
  782. result: &xsql.Tuple{Message: xsql.Message{
  783. "abc": common.TimeFromUnixMilli(1568854515000),
  784. "def": common.TimeFromUnixMilli(1568854573431),
  785. }, Timestamp: int64(1568854515000),
  786. },
  787. },
  788. {
  789. stmt: &xsql.StreamStmt{
  790. Name: xsql.StreamName("demo"),
  791. StreamFields: []xsql.StreamField{
  792. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  793. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  794. },
  795. Options: &xsql.Options{
  796. DATASOURCE: "users",
  797. TIMESTAMP: "def",
  798. TIMESTAMP_FORMAT: "yyyy-MM-dd'AT'HH:mm:ss",
  799. },
  800. },
  801. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  802. result: &xsql.Tuple{Message: xsql.Message{
  803. "abc": float64(34),
  804. "def": "2019-09-23AT02:47:29",
  805. }, Timestamp: int64(1569206849000),
  806. },
  807. },
  808. {
  809. stmt: &xsql.StreamStmt{
  810. Name: xsql.StreamName("demo"),
  811. StreamFields: []xsql.StreamField{
  812. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  813. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  814. },
  815. Options: &xsql.Options{
  816. DATASOURCE: "users",
  817. TIMESTAMP: "def",
  818. TIMESTAMP_FORMAT: "yyyy-MM-ddaHH:mm:ss",
  819. },
  820. },
  821. data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
  822. result: errors.New("cannot convert timestamp field def to timestamp with error parsing time \"2019-09-23AT02:47:29\" as \"2006-01-02PM15:04:05\": cannot parse \"02:47:29\" as \"PM\""),
  823. },
  824. }
  825. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  826. defer common.CloseLogger()
  827. contextLogger := common.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
  828. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  829. for i, tt := range tests {
  830. pp := &Preprocessor{
  831. defaultFieldProcessor: defaultFieldProcessor{
  832. streamFields: convertFields(tt.stmt.StreamFields),
  833. aliasFields: nil,
  834. isBinary: false,
  835. timestampFormat: tt.stmt.Options.TIMESTAMP_FORMAT,
  836. },
  837. isEventTime: true,
  838. timestampField: tt.stmt.Options.TIMESTAMP,
  839. }
  840. dm := make(map[string]interface{})
  841. if e := json.Unmarshal(tt.data, &dm); e != nil {
  842. log.Fatal(e)
  843. return
  844. } else {
  845. tuple := &xsql.Tuple{Message: dm}
  846. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  847. result := pp.Apply(ctx, tuple, fv, afv)
  848. //workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
  849. if rt, ok := result.(*xsql.Tuple); ok {
  850. if rtt, ok := rt.Message["abc"].(time.Time); ok {
  851. rt.Message["abc"] = rtt.UTC()
  852. }
  853. }
  854. if !reflect.DeepEqual(tt.result, result) {
  855. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  856. }
  857. }
  858. }
  859. }
  860. func TestPreprocessorError(t *testing.T) {
  861. tests := []struct {
  862. stmt *xsql.StreamStmt
  863. data []byte
  864. result interface{}
  865. }{
  866. {
  867. stmt: &xsql.StreamStmt{
  868. Name: xsql.StreamName("demo"),
  869. StreamFields: []xsql.StreamField{
  870. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  871. },
  872. },
  873. data: []byte(`{"abc": "dafsad"}`),
  874. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(dafsad)"),
  875. }, {
  876. stmt: &xsql.StreamStmt{
  877. Name: xsql.StreamName("demo"),
  878. StreamFields: []xsql.StreamField{
  879. {Name: "a", FieldType: &xsql.RecType{
  880. StreamFields: []xsql.StreamField{
  881. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  882. },
  883. }},
  884. },
  885. },
  886. data: []byte(`{"a": {"d" : "hello"}}`),
  887. result: errors.New("error in preprocessor: invalid data map[d:hello], field b not found"),
  888. }, {
  889. stmt: &xsql.StreamStmt{
  890. Name: xsql.StreamName("demo"),
  891. StreamFields: []xsql.StreamField{
  892. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  893. },
  894. Options: &xsql.Options{
  895. DATASOURCE: "users",
  896. FORMAT: "JSON",
  897. KEY: "USERID",
  898. CONF_KEY: "srv1",
  899. TYPE: "MQTT",
  900. TIMESTAMP: "abc",
  901. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  902. },
  903. },
  904. data: []byte(`{"abc": "not a time"}`),
  905. result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(not a time)"),
  906. },
  907. }
  908. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  909. defer common.CloseLogger()
  910. contextLogger := common.Log.WithField("rule", "TestPreprocessorError")
  911. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  912. for i, tt := range tests {
  913. pp := &Preprocessor{}
  914. pp.streamFields = convertFields(tt.stmt.StreamFields)
  915. dm := make(map[string]interface{})
  916. if e := json.Unmarshal(tt.data, &dm); e != nil {
  917. log.Fatal(e)
  918. return
  919. } else {
  920. tuple := &xsql.Tuple{Message: dm}
  921. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  922. result := pp.Apply(ctx, tuple, fv, afv)
  923. if !reflect.DeepEqual(tt.result, result) {
  924. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
  925. }
  926. }
  927. }
  928. }
  929. func TestPreprocessorForBinary(t *testing.T) {
  930. docsFolder, err := common.GetLoc("/docs/")
  931. if err != nil {
  932. t.Errorf("Cannot find docs folder: %v", err)
  933. }
  934. image, err := ioutil.ReadFile(path.Join(docsFolder, "cover.jpg"))
  935. if err != nil {
  936. t.Errorf("Cannot read image: %v", err)
  937. }
  938. b64img := base64.StdEncoding.EncodeToString(image)
  939. //TODO test bytea type conversion to string or else
  940. var tests = []struct {
  941. stmt *xsql.StreamStmt
  942. data []byte
  943. isBinary bool
  944. result interface{}
  945. }{
  946. {
  947. stmt: &xsql.StreamStmt{
  948. Name: xsql.StreamName("demo"),
  949. StreamFields: nil,
  950. },
  951. data: image,
  952. isBinary: true,
  953. result: &xsql.Tuple{Message: xsql.Message{
  954. "self": image,
  955. },
  956. },
  957. },
  958. {
  959. stmt: &xsql.StreamStmt{
  960. Name: xsql.StreamName("demo"),
  961. StreamFields: []xsql.StreamField{
  962. {Name: "img", FieldType: &xsql.BasicType{Type: xsql.BYTEA}},
  963. },
  964. },
  965. data: image,
  966. isBinary: true,
  967. result: &xsql.Tuple{Message: xsql.Message{
  968. "img": image,
  969. },
  970. },
  971. },
  972. {
  973. stmt: &xsql.StreamStmt{
  974. Name: xsql.StreamName("demo"),
  975. StreamFields: []xsql.StreamField{
  976. {Name: "a", FieldType: &xsql.RecType{
  977. StreamFields: []xsql.StreamField{
  978. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.BYTEA}},
  979. },
  980. }},
  981. },
  982. },
  983. data: []byte(fmt.Sprintf(`{"a": {"b" : "%s"}}`, b64img)),
  984. result: &xsql.Tuple{Message: xsql.Message{
  985. "a": map[string]interface{}{
  986. "b": image,
  987. },
  988. },
  989. },
  990. },
  991. {
  992. stmt: &xsql.StreamStmt{
  993. Name: xsql.StreamName("demo"),
  994. StreamFields: []xsql.StreamField{
  995. {Name: "a", FieldType: &xsql.ArrayType{
  996. Type: xsql.BYTEA,
  997. }},
  998. },
  999. },
  1000. data: []byte(fmt.Sprintf(`{"a": ["%s"]}`, b64img)),
  1001. result: &xsql.Tuple{Message: xsql.Message{
  1002. "a": [][]byte{
  1003. image,
  1004. },
  1005. },
  1006. },
  1007. },
  1008. {
  1009. stmt: &xsql.StreamStmt{
  1010. Name: xsql.StreamName("demo"),
  1011. StreamFields: []xsql.StreamField{
  1012. {Name: "a", FieldType: &xsql.ArrayType{
  1013. Type: xsql.STRUCT,
  1014. FieldType: &xsql.RecType{
  1015. StreamFields: []xsql.StreamField{
  1016. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.BYTEA}},
  1017. },
  1018. },
  1019. }},
  1020. },
  1021. },
  1022. data: []byte(fmt.Sprintf(`{"a": [{"b":"%s"}]}`, b64img)),
  1023. result: &xsql.Tuple{Message: xsql.Message{
  1024. "a": []map[string]interface{}{
  1025. {"b": image},
  1026. },
  1027. },
  1028. },
  1029. },
  1030. }
  1031. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1032. defer common.CloseLogger()
  1033. contextLogger := common.Log.WithField("rule", "TestPreprocessorForBinary")
  1034. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1035. for i, tt := range tests {
  1036. pp := &Preprocessor{}
  1037. pp.streamFields = convertFields(tt.stmt.StreamFields)
  1038. pp.isBinary = tt.isBinary
  1039. format := "json"
  1040. if tt.isBinary {
  1041. format = "binary"
  1042. }
  1043. if dm, e := common.MessageDecode(tt.data, format); e != nil {
  1044. log.Fatal(e)
  1045. return
  1046. } else {
  1047. tuple := &xsql.Tuple{Message: dm}
  1048. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  1049. result := pp.Apply(ctx, tuple, fv, afv)
  1050. if !reflect.DeepEqual(tt.result, result) {
  1051. t.Errorf("%d. %q\n\nresult mismatch", i, tuple)
  1052. }
  1053. }
  1054. }
  1055. }