converter_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package json
  15. import (
  16. "encoding/base64"
  17. "fmt"
  18. "os"
  19. "path"
  20. "reflect"
  21. "testing"
  22. "github.com/stretchr/testify/require"
  23. "github.com/lf-edge/ekuiper/pkg/ast"
  24. )
  25. func TestMessageDecode(t *testing.T) {
  26. image, err := os.ReadFile(path.Join("../../../docs", "cover.jpg"))
  27. if err != nil {
  28. t.Errorf("Cannot read image: %v", err)
  29. }
  30. b64img := base64.StdEncoding.EncodeToString(image)
  31. tests := []struct {
  32. payload []byte
  33. format string
  34. result map[string]interface{}
  35. results []interface{}
  36. }{
  37. {
  38. payload: []byte(fmt.Sprintf(`{"format":"jpg","content":"%s"}`, b64img)),
  39. format: "json",
  40. result: map[string]interface{}{
  41. "format": "jpg",
  42. "content": b64img,
  43. },
  44. },
  45. {
  46. payload: []byte(`[{"a":1},{"a":2}]`),
  47. format: "json",
  48. results: []interface{}{
  49. map[string]interface{}{
  50. "a": float64(1),
  51. },
  52. map[string]interface{}{
  53. "a": float64(2),
  54. },
  55. },
  56. },
  57. }
  58. conv, _ := GetConverter()
  59. for i, tt := range tests {
  60. result, err := conv.Decode(tt.payload)
  61. if err != nil {
  62. t.Errorf("%d decode error: %v", i, err)
  63. }
  64. if len(tt.results) > 0 {
  65. if !reflect.DeepEqual(tt.results, result) {
  66. t.Errorf("%d result mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, result)
  67. }
  68. } else {
  69. if !reflect.DeepEqual(tt.result, result) {
  70. t.Errorf("%d result mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, result)
  71. }
  72. }
  73. }
  74. }
  75. func TestFastJsonConverterWithSchema(t *testing.T) {
  76. origin := "123"
  77. encode := base64.StdEncoding.EncodeToString([]byte(origin))
  78. testcases := []struct {
  79. schema map[string]*ast.JsonStreamField
  80. payload []byte
  81. require map[string]interface{}
  82. }{
  83. {
  84. payload: []byte(`{"a":["true"]}`),
  85. schema: map[string]*ast.JsonStreamField{
  86. "a": {
  87. Type: "array",
  88. Items: &ast.JsonStreamField{
  89. Type: "boolean",
  90. },
  91. },
  92. },
  93. require: map[string]interface{}{
  94. "a": []interface{}{true},
  95. },
  96. },
  97. {
  98. payload: []byte(`{"a":[true]}`),
  99. schema: map[string]*ast.JsonStreamField{
  100. "a": {
  101. Type: "array",
  102. Items: &ast.JsonStreamField{
  103. Type: "boolean",
  104. },
  105. },
  106. },
  107. require: map[string]interface{}{
  108. "a": []interface{}{true},
  109. },
  110. },
  111. {
  112. payload: []byte(`{"a":1}`),
  113. schema: map[string]*ast.JsonStreamField{
  114. "a": {
  115. Type: "bigint",
  116. },
  117. },
  118. require: map[string]interface{}{
  119. "a": int64(1),
  120. },
  121. },
  122. {
  123. payload: []byte(`{"a":1}`),
  124. schema: map[string]*ast.JsonStreamField{
  125. "a": {
  126. Type: "float",
  127. },
  128. },
  129. require: map[string]interface{}{
  130. "a": float64(1),
  131. },
  132. },
  133. {
  134. payload: []byte(`{"a":"a"}`),
  135. schema: map[string]*ast.JsonStreamField{
  136. "a": {
  137. Type: "string",
  138. },
  139. },
  140. require: map[string]interface{}{
  141. "a": "a",
  142. },
  143. },
  144. {
  145. payload: []byte(fmt.Sprintf(`{"a":"%v"}`, encode)),
  146. schema: map[string]*ast.JsonStreamField{
  147. "a": {
  148. Type: "bytea",
  149. },
  150. },
  151. require: map[string]interface{}{
  152. "a": []byte(origin),
  153. },
  154. },
  155. {
  156. payload: []byte(`{"a":true}`),
  157. schema: map[string]*ast.JsonStreamField{
  158. "a": {
  159. Type: "boolean",
  160. },
  161. },
  162. require: map[string]interface{}{
  163. "a": true,
  164. },
  165. },
  166. {
  167. payload: []byte(`{"a":123}`),
  168. schema: map[string]*ast.JsonStreamField{
  169. "a": {
  170. Type: "datetime",
  171. },
  172. },
  173. require: map[string]interface{}{
  174. "a": float64(123),
  175. },
  176. },
  177. {
  178. payload: []byte(`{"a":"123"}`),
  179. schema: map[string]*ast.JsonStreamField{
  180. "a": {
  181. Type: "datetime",
  182. },
  183. },
  184. require: map[string]interface{}{
  185. "a": "123",
  186. },
  187. },
  188. {
  189. payload: []byte(`{"a":{"b":1}}`),
  190. schema: map[string]*ast.JsonStreamField{
  191. "a": {
  192. Type: "struct",
  193. Properties: map[string]*ast.JsonStreamField{
  194. "b": {
  195. Type: "bigint",
  196. },
  197. },
  198. },
  199. },
  200. require: map[string]interface{}{
  201. "a": map[string]interface{}{
  202. "b": int64(1),
  203. },
  204. },
  205. },
  206. }
  207. for _, tc := range testcases {
  208. f := NewFastJsonConverter(tc.schema)
  209. v, err := f.Decode(tc.payload)
  210. require.NoError(t, err)
  211. require.Equal(t, v, tc.require)
  212. }
  213. for _, tc := range testcases {
  214. arrayPayload := []byte(fmt.Sprintf("[%s]", string(tc.payload)))
  215. arrayRequire := []map[string]interface{}{
  216. tc.require,
  217. }
  218. f := NewFastJsonConverter(tc.schema)
  219. v, err := f.Decode(arrayPayload)
  220. require.NoError(t, err)
  221. require.Equal(t, v, arrayRequire)
  222. }
  223. }
  224. func TestFastJsonConverterWithSchemaError(t *testing.T) {
  225. testcases := []struct {
  226. schema map[string]*ast.JsonStreamField
  227. payload []byte
  228. err error
  229. }{
  230. {
  231. payload: []byte(`{123}`),
  232. schema: map[string]*ast.JsonStreamField{
  233. "a": {
  234. Type: "bigint",
  235. },
  236. },
  237. err: fmt.Errorf(`cannot parse JSON: cannot parse object: cannot find opening '"" for object key; unparsed tail: "123}"`),
  238. },
  239. {
  240. payload: []byte(`123`),
  241. schema: map[string]*ast.JsonStreamField{
  242. "a": {
  243. Type: "bigint",
  244. },
  245. },
  246. err: fmt.Errorf("only map[string]interface{} and []map[string]interface{} is supported"),
  247. },
  248. {
  249. payload: []byte(`{"a":{"b":1}}`),
  250. schema: map[string]*ast.JsonStreamField{
  251. "a": {
  252. Type: "bigint",
  253. },
  254. },
  255. err: fmt.Errorf("a has wrong type:object, expect:bigint"),
  256. },
  257. {
  258. payload: []byte(`{"a":{"b":1}}`),
  259. schema: map[string]*ast.JsonStreamField{
  260. "a": {
  261. Type: "string",
  262. },
  263. },
  264. err: fmt.Errorf("a has wrong type:object, expect:string"),
  265. },
  266. {
  267. payload: []byte(`{"a":123}`),
  268. schema: map[string]*ast.JsonStreamField{
  269. "a": {
  270. Type: "array",
  271. },
  272. },
  273. err: fmt.Errorf("a has wrong type:number, expect:array"),
  274. },
  275. {
  276. payload: []byte(`{"a":123}`),
  277. schema: map[string]*ast.JsonStreamField{
  278. "a": {
  279. Type: "struct",
  280. },
  281. },
  282. err: fmt.Errorf("a has wrong type:number, expect:struct"),
  283. },
  284. {
  285. payload: []byte(`{"a":{"b":1}}`),
  286. schema: map[string]*ast.JsonStreamField{
  287. "a": {
  288. Type: "boolean",
  289. },
  290. },
  291. err: fmt.Errorf("parse a failed, err:wrong type:object, expect:boolean"),
  292. },
  293. {
  294. payload: []byte(`{"a":true}`),
  295. schema: map[string]*ast.JsonStreamField{
  296. "a": {
  297. Type: "datetime",
  298. },
  299. },
  300. err: fmt.Errorf("a has wrong type:true, expect:datetime"),
  301. },
  302. {
  303. payload: []byte(`{"a":[{"b":1}]}`),
  304. schema: map[string]*ast.JsonStreamField{
  305. "a": {
  306. Type: "array",
  307. Items: &ast.JsonStreamField{
  308. Type: "bigint",
  309. },
  310. },
  311. },
  312. err: fmt.Errorf("array has wrong type:object, expect:bigint"),
  313. },
  314. {
  315. payload: []byte(`{"a":[{"b":1}]}`),
  316. schema: map[string]*ast.JsonStreamField{
  317. "a": {
  318. Type: "array",
  319. Items: &ast.JsonStreamField{
  320. Type: "string",
  321. },
  322. },
  323. },
  324. err: fmt.Errorf("array has wrong type:object, expect:string"),
  325. },
  326. {
  327. payload: []byte(`{"a":[123]}`),
  328. schema: map[string]*ast.JsonStreamField{
  329. "a": {
  330. Type: "array",
  331. Items: &ast.JsonStreamField{
  332. Type: "array",
  333. },
  334. },
  335. },
  336. err: fmt.Errorf("array has wrong type:number, expect:array"),
  337. },
  338. {
  339. payload: []byte(`{"a":[123]}`),
  340. schema: map[string]*ast.JsonStreamField{
  341. "a": {
  342. Type: "array",
  343. Items: &ast.JsonStreamField{
  344. Type: "struct",
  345. },
  346. },
  347. },
  348. err: fmt.Errorf("array has wrong type:number, expect:struct"),
  349. },
  350. {
  351. payload: []byte(`{"a":[{"b":1}]}`),
  352. schema: map[string]*ast.JsonStreamField{
  353. "a": {
  354. Type: "array",
  355. Items: &ast.JsonStreamField{
  356. Type: "boolean",
  357. },
  358. },
  359. },
  360. err: fmt.Errorf("parse array failed, err:wrong type:object, expect:boolean"),
  361. },
  362. {
  363. payload: []byte(`{"a":[true]}`),
  364. schema: map[string]*ast.JsonStreamField{
  365. "a": {
  366. Type: "array",
  367. Items: &ast.JsonStreamField{
  368. Type: "datetime",
  369. },
  370. },
  371. },
  372. err: fmt.Errorf("array has wrong type:true, expect:datetime"),
  373. },
  374. }
  375. for _, tc := range testcases {
  376. f := NewFastJsonConverter(tc.schema)
  377. _, err := f.Decode(tc.payload)
  378. require.Error(t, err)
  379. require.Equal(t, err, tc.err)
  380. }
  381. }
  382. func TestFastJsonEncode(t *testing.T) {
  383. a := make(map[string]int)
  384. a["a"] = 1
  385. f := NewFastJsonConverter(nil)
  386. v, err := f.Encode(a)
  387. require.NoError(t, err)
  388. require.Equal(t, v, []byte(`{"a":1}`))
  389. }
  390. func TestArrayWithArray(t *testing.T) {
  391. payload := []byte(`{
  392. "a":[
  393. [
  394. {
  395. "c":1
  396. }
  397. ]
  398. ]
  399. }`)
  400. schema := map[string]*ast.JsonStreamField{
  401. "a": {
  402. Type: "array",
  403. Items: &ast.JsonStreamField{
  404. Type: "array",
  405. Items: &ast.JsonStreamField{
  406. Type: "struct",
  407. Properties: map[string]*ast.JsonStreamField{
  408. "c": {
  409. Type: "bigint",
  410. },
  411. },
  412. },
  413. },
  414. },
  415. }
  416. f := NewFastJsonConverter(schema)
  417. v, err := f.Decode(payload)
  418. require.NoError(t, err)
  419. require.Equal(t, v, map[string]interface{}{
  420. "a": []interface{}{
  421. []interface{}{
  422. map[string]interface{}{
  423. "c": int64(1),
  424. },
  425. },
  426. },
  427. })
  428. }
  429. func TestTypeNull(t *testing.T) {
  430. testcases := []struct {
  431. schema map[string]*ast.JsonStreamField
  432. payload []byte
  433. require map[string]interface{}
  434. }{
  435. {
  436. payload: []byte(`{"a":[null]}`),
  437. schema: map[string]*ast.JsonStreamField{
  438. "a": {
  439. Type: "array",
  440. Items: &ast.JsonStreamField{
  441. Type: "bytea",
  442. },
  443. },
  444. },
  445. require: map[string]interface{}{
  446. "a": []interface{}{nil},
  447. },
  448. },
  449. {
  450. payload: []byte(`{"a":[null]}`),
  451. schema: map[string]*ast.JsonStreamField{
  452. "a": {
  453. Type: "array",
  454. Items: &ast.JsonStreamField{
  455. Type: "string",
  456. },
  457. },
  458. },
  459. require: map[string]interface{}{
  460. "a": []interface{}{nil},
  461. },
  462. },
  463. {
  464. payload: []byte(`{"a":[null]}`),
  465. schema: map[string]*ast.JsonStreamField{
  466. "a": {
  467. Type: "array",
  468. Items: &ast.JsonStreamField{
  469. Type: "float",
  470. },
  471. },
  472. },
  473. require: map[string]interface{}{
  474. "a": []interface{}{nil},
  475. },
  476. },
  477. {
  478. payload: []byte(`{"a":[null]}`),
  479. schema: map[string]*ast.JsonStreamField{
  480. "a": {
  481. Type: "array",
  482. Items: &ast.JsonStreamField{
  483. Type: "bigint",
  484. },
  485. },
  486. },
  487. require: map[string]interface{}{
  488. "a": []interface{}{nil},
  489. },
  490. },
  491. {
  492. payload: []byte(`{"a":[null]}`),
  493. schema: map[string]*ast.JsonStreamField{
  494. "a": {
  495. Type: "array",
  496. Items: &ast.JsonStreamField{
  497. Type: "boolean",
  498. },
  499. },
  500. },
  501. require: map[string]interface{}{
  502. "a": []interface{}{nil},
  503. },
  504. },
  505. {
  506. payload: []byte(`{"a":null}`),
  507. schema: map[string]*ast.JsonStreamField{
  508. "a": {
  509. Type: "bigint",
  510. },
  511. },
  512. require: map[string]interface{}{
  513. "a": nil,
  514. },
  515. },
  516. {
  517. payload: []byte(`{"a":null}`),
  518. schema: map[string]*ast.JsonStreamField{
  519. "a": {
  520. Type: "float",
  521. },
  522. },
  523. require: map[string]interface{}{
  524. "a": nil,
  525. },
  526. },
  527. {
  528. payload: []byte(`{"a":null}`),
  529. schema: map[string]*ast.JsonStreamField{
  530. "a": {
  531. Type: "string",
  532. },
  533. },
  534. require: map[string]interface{}{
  535. "a": nil,
  536. },
  537. },
  538. {
  539. payload: []byte(`{"a":null}`),
  540. schema: map[string]*ast.JsonStreamField{
  541. "a": {
  542. Type: "bytea",
  543. },
  544. },
  545. require: map[string]interface{}{
  546. "a": nil,
  547. },
  548. },
  549. {
  550. payload: []byte(`{"a":null}`),
  551. schema: map[string]*ast.JsonStreamField{
  552. "a": {
  553. Type: "boolean",
  554. },
  555. },
  556. require: map[string]interface{}{
  557. "a": nil,
  558. },
  559. },
  560. {
  561. payload: []byte(`{"a":null}`),
  562. schema: map[string]*ast.JsonStreamField{
  563. "a": {
  564. Type: "datetime",
  565. },
  566. },
  567. require: map[string]interface{}{
  568. "a": nil,
  569. },
  570. },
  571. {
  572. payload: []byte(`{"a":{"b":null}}`),
  573. schema: map[string]*ast.JsonStreamField{
  574. "a": {
  575. Type: "struct",
  576. Properties: map[string]*ast.JsonStreamField{
  577. "b": {
  578. Type: "bigint",
  579. },
  580. },
  581. },
  582. },
  583. require: map[string]interface{}{
  584. "a": map[string]interface{}{
  585. "b": nil,
  586. },
  587. },
  588. },
  589. }
  590. for _, tc := range testcases {
  591. arrayPayload := []byte(fmt.Sprintf("[%s]", string(tc.payload)))
  592. arrayRequire := []map[string]interface{}{
  593. tc.require,
  594. }
  595. f := NewFastJsonConverter(tc.schema)
  596. v, err := f.Decode(arrayPayload)
  597. require.NoError(t, err)
  598. require.Equal(t, v, arrayRequire)
  599. }
  600. for _, tc := range testcases {
  601. arrayPayload := []byte(fmt.Sprintf("[%s]", string(tc.payload)))
  602. arrayRequire := []map[string]interface{}{
  603. tc.require,
  604. }
  605. f := NewFastJsonConverter(tc.schema)
  606. v, err := f.Decode(arrayPayload)
  607. require.NoError(t, err)
  608. require.Equal(t, v, arrayRequire)
  609. }
  610. }
  611. func TestConvertBytea(t *testing.T) {
  612. origin := "123"
  613. encode := base64.StdEncoding.EncodeToString([]byte(origin))
  614. payload := fmt.Sprintf(`{"a":"%s"}`, encode)
  615. schema := map[string]*ast.JsonStreamField{
  616. "a": {
  617. Type: "bytea",
  618. },
  619. }
  620. f := NewFastJsonConverter(schema)
  621. v, err := f.Decode([]byte(payload))
  622. require.NoError(t, err)
  623. require.Equal(t, v, map[string]interface{}{
  624. "a": []byte(origin),
  625. })
  626. payload = fmt.Sprintf(`{"a":["%s"]}`, encode)
  627. schema = map[string]*ast.JsonStreamField{
  628. "a": {
  629. Type: "array",
  630. Items: &ast.JsonStreamField{
  631. Type: "bytea",
  632. },
  633. },
  634. }
  635. f = NewFastJsonConverter(schema)
  636. v, err = f.Decode([]byte(payload))
  637. require.NoError(t, err)
  638. require.Equal(t, v, map[string]interface{}{
  639. "a": []interface{}{[]byte(origin)},
  640. })
  641. }