converter.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package json
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/valyala/fastjson"
  19. "github.com/lf-edge/ekuiper/pkg/ast"
  20. "github.com/lf-edge/ekuiper/pkg/message"
  21. )
  22. type Converter struct{}
  23. var converter = &Converter{}
  24. func GetConverter() (message.Converter, error) {
  25. return converter, nil
  26. }
  27. func (c *Converter) Encode(d interface{}) ([]byte, error) {
  28. return json.Marshal(d)
  29. }
  30. func (c *Converter) Decode(b []byte) (interface{}, error) {
  31. var r0 interface{}
  32. err := json.Unmarshal(b, &r0)
  33. if err != nil {
  34. return nil, err
  35. }
  36. return r0, nil
  37. }
  38. type FastJsonConverter struct {
  39. schema map[string]*ast.JsonStreamField
  40. }
  41. func NewFastJsonConverter(schema map[string]*ast.JsonStreamField) *FastJsonConverter {
  42. return &FastJsonConverter{
  43. schema: schema,
  44. }
  45. }
  46. func (c *FastJsonConverter) Encode(d interface{}) ([]byte, error) {
  47. return json.Marshal(d)
  48. }
  49. func (c *FastJsonConverter) Decode(b []byte) (interface{}, error) {
  50. return c.decodeWithSchema(b, c.schema)
  51. }
  52. func (f *FastJsonConverter) decodeWithSchema(b []byte, schema map[string]*ast.JsonStreamField) (interface{}, error) {
  53. var p fastjson.Parser
  54. v, err := p.ParseBytes(b)
  55. if err != nil {
  56. return nil, err
  57. }
  58. switch v.Type() {
  59. case fastjson.TypeArray:
  60. array, err := v.Array()
  61. if err != nil {
  62. return nil, err
  63. }
  64. ms := make([]map[string]interface{}, len(array))
  65. for i, v := range array {
  66. obj, err := v.Object()
  67. if err != nil {
  68. return nil, err
  69. }
  70. subMap, err := f.decodeObject(obj, schema)
  71. if err != nil {
  72. return nil, err
  73. }
  74. ms[i] = subMap
  75. }
  76. return ms, nil
  77. case fastjson.TypeObject:
  78. obj, err := v.Object()
  79. if err != nil {
  80. return nil, err
  81. }
  82. m, err := f.decodeObject(obj, schema)
  83. if err != nil {
  84. return nil, err
  85. }
  86. return m, nil
  87. }
  88. return nil, fmt.Errorf("only map[string]interface{} and []map[string]interface{} is supported")
  89. }
  90. func (f *FastJsonConverter) decodeArray(array []*fastjson.Value, field *ast.JsonStreamField) ([]interface{}, error) {
  91. vs := make([]interface{}, len(array))
  92. switch field.Type {
  93. case "bigint", "float":
  94. for i, item := range array {
  95. typ := item.Type()
  96. switch typ {
  97. case fastjson.TypeNumber:
  98. f64, err := item.Float64()
  99. if err != nil {
  100. return nil, err
  101. }
  102. vs[i] = f64
  103. default:
  104. return nil, fmt.Errorf("array has wrong type:%v, expect:%v", typ.String(), field.Type)
  105. }
  106. }
  107. case "string", "bytea":
  108. for i, item := range array {
  109. typ := item.Type()
  110. switch typ {
  111. case fastjson.TypeString:
  112. s, err := item.StringBytes()
  113. if err != nil {
  114. return nil, err
  115. }
  116. vs[i] = string(s)
  117. case fastjson.TypeNumber:
  118. f64, err := item.Float64()
  119. if err != nil {
  120. return nil, err
  121. }
  122. vs[i] = f64
  123. default:
  124. return nil, fmt.Errorf("array has wrong type:%v, expect:%v", typ.String(), field.Type)
  125. }
  126. }
  127. case "array":
  128. for i, item := range array {
  129. typ := item.Type()
  130. switch typ {
  131. case fastjson.TypeArray:
  132. childArrays, err := item.Array()
  133. if err != nil {
  134. return nil, err
  135. }
  136. subList, err := f.decodeArray(childArrays, field.Items)
  137. if err != nil {
  138. return nil, err
  139. }
  140. vs[i] = subList
  141. default:
  142. return nil, fmt.Errorf("array has wrong type:%v, expect:%v", typ.String(), field.Type)
  143. }
  144. }
  145. case "struct":
  146. for i, item := range array {
  147. typ := item.Type()
  148. switch typ {
  149. case fastjson.TypeObject:
  150. childObj, err := item.Object()
  151. if err != nil {
  152. return nil, err
  153. }
  154. subMap, err := f.decodeObject(childObj, field.Properties)
  155. if err != nil {
  156. return nil, err
  157. }
  158. vs[i] = subMap
  159. default:
  160. return nil, fmt.Errorf("array has wrong type:%v, expect:%v", typ.String(), field.Type)
  161. }
  162. }
  163. case "boolean":
  164. for i, item := range array {
  165. typ := item.Type()
  166. switch typ {
  167. case fastjson.TypeTrue, fastjson.TypeFalse:
  168. b, err := item.Bool()
  169. if err != nil {
  170. return nil, err
  171. }
  172. vs[i] = b
  173. default:
  174. return nil, fmt.Errorf("array has wrong type:%v, expect:%v", typ.String(), field.Type)
  175. }
  176. }
  177. case "datetime":
  178. for i, item := range array {
  179. typ := item.Type()
  180. switch typ {
  181. case fastjson.TypeNumber:
  182. f64, err := item.Float64()
  183. if err != nil {
  184. return nil, err
  185. }
  186. vs[i] = f64
  187. case fastjson.TypeString:
  188. s, err := item.StringBytes()
  189. if err != nil {
  190. return nil, err
  191. }
  192. vs[i] = string(s)
  193. default:
  194. return nil, fmt.Errorf("array has wrong type:%v, expect:%v", typ.String(), field.Type)
  195. }
  196. }
  197. default:
  198. return nil, fmt.Errorf("unknown filed type:%s", field.Type)
  199. }
  200. return vs, nil
  201. }
  202. func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string]*ast.JsonStreamField) (map[string]interface{}, error) {
  203. m := make(map[string]interface{})
  204. for key, field := range schema {
  205. if obj.Get(key) == nil {
  206. continue
  207. }
  208. switch field.Type {
  209. case "bigint", "float":
  210. typ := obj.Get(key).Type()
  211. switch typ {
  212. case fastjson.TypeNumber:
  213. f64v, err := obj.Get(key).Float64()
  214. if err != nil {
  215. return nil, err
  216. }
  217. m[key] = f64v
  218. default:
  219. return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", key, typ.String(), field.Type)
  220. }
  221. case "string", "bytea":
  222. typ := obj.Get(key).Type()
  223. switch typ {
  224. case fastjson.TypeString:
  225. s, err := obj.Get(key).StringBytes()
  226. if err != nil {
  227. return nil, err
  228. }
  229. m[key] = string(s)
  230. case fastjson.TypeNumber:
  231. f64v, err := obj.Get(key).Float64()
  232. if err != nil {
  233. return nil, err
  234. }
  235. m[key] = f64v
  236. default:
  237. return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", key, typ.String(), field.Type)
  238. }
  239. case "array":
  240. typ := obj.Get(key).Type()
  241. switch typ {
  242. case fastjson.TypeArray:
  243. childArray, err := obj.Get(key).Array()
  244. if err != nil {
  245. return nil, err
  246. }
  247. subList, err := f.decodeArray(childArray, schema[key].Items)
  248. if err != nil {
  249. return nil, err
  250. }
  251. m[key] = subList
  252. default:
  253. return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", key, typ.String(), field.Type)
  254. }
  255. case "struct":
  256. typ := obj.Get(key).Type()
  257. switch typ {
  258. case fastjson.TypeObject:
  259. childObj, err := obj.Get(key).Object()
  260. if err != nil {
  261. return nil, err
  262. }
  263. childMap, err := f.decodeObject(childObj, schema[key].Properties)
  264. if err != nil {
  265. return nil, err
  266. }
  267. m[key] = childMap
  268. default:
  269. return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", key, typ.String(), field.Type)
  270. }
  271. case "boolean":
  272. typ := obj.Get(key).Type()
  273. switch typ {
  274. case fastjson.TypeFalse, fastjson.TypeTrue:
  275. b, err := obj.Get(key).Bool()
  276. if err != nil {
  277. return nil, err
  278. }
  279. m[key] = b
  280. default:
  281. return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", key, typ.String(), field.Type)
  282. }
  283. case "datetime":
  284. typ := obj.Get(key).Type()
  285. switch typ {
  286. case fastjson.TypeString:
  287. s, err := obj.Get(key).StringBytes()
  288. if err != nil {
  289. return nil, err
  290. }
  291. m[key] = string(s)
  292. case fastjson.TypeNumber:
  293. f64v, err := obj.Get(key).Float64()
  294. if err != nil {
  295. return nil, err
  296. }
  297. m[key] = f64v
  298. default:
  299. return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", key, typ.String(), field.Type)
  300. }
  301. default:
  302. return nil, fmt.Errorf("unknown filed type:%s", field.Type)
  303. }
  304. }
  305. return m, nil
  306. }