field_processor.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "encoding/base64"
  17. "encoding/json"
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/xsql"
  21. "github.com/lf-edge/ekuiper/pkg/ast"
  22. "github.com/lf-edge/ekuiper/pkg/cast"
  23. "github.com/lf-edge/ekuiper/pkg/message"
  24. "math"
  25. "reflect"
  26. "strconv"
  27. "time"
  28. )
  29. type defaultFieldProcessor struct {
  30. streamFields []interface{}
  31. timestampFormat string
  32. isBinary bool
  33. strictValidation bool
  34. }
  35. func (p *defaultFieldProcessor) processField(tuple *xsql.Tuple, _ *xsql.FunctionValuer) (map[string]interface{}, error) {
  36. result := make(map[string]interface{})
  37. if p.streamFields != nil {
  38. for _, f := range p.streamFields {
  39. switch sf := f.(type) {
  40. case *ast.StreamField:
  41. if p.isBinary {
  42. tuple.Message[sf.Name] = tuple.Message[message.DefaultField]
  43. }
  44. if e := p.addRecField(sf.FieldType, result, tuple.Message, sf.Name); e != nil {
  45. return nil, e
  46. }
  47. case string: //schemaless
  48. if p.isBinary {
  49. result = tuple.Message
  50. } else {
  51. if m, ok := tuple.Message.Value(sf, ""); ok {
  52. result[sf] = m
  53. }
  54. }
  55. }
  56. if p.isBinary {
  57. break //binary format should only have ONE field
  58. }
  59. }
  60. } else {
  61. result = tuple.Message
  62. }
  63. return result, nil
  64. }
  65. func (p *defaultFieldProcessor) addRecField(ft ast.FieldType, r map[string]interface{}, j xsql.Message, n string) error {
  66. if t, ok := j.Value(n, ""); ok {
  67. v := reflect.ValueOf(t)
  68. jtype := v.Kind()
  69. switch st := ft.(type) {
  70. case *ast.BasicType:
  71. switch st.Type {
  72. case ast.UNKNOWN:
  73. return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
  74. case ast.BIGINT:
  75. if jtype == reflect.Int {
  76. r[n] = t.(int)
  77. } else if jtype == reflect.Float64 {
  78. if tt, ok1 := t.(float64); ok1 {
  79. if tt > math.MaxInt64 {
  80. r[n] = uint64(tt)
  81. } else {
  82. r[n] = int(tt)
  83. }
  84. }
  85. } else if jtype == reflect.String {
  86. if i, err := strconv.Atoi(t.(string)); err != nil {
  87. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  88. } else {
  89. r[n] = i
  90. }
  91. } else if jtype == reflect.Uint64 {
  92. r[n] = t.(uint64)
  93. } else {
  94. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  95. }
  96. case ast.FLOAT:
  97. if jtype == reflect.Float64 {
  98. r[n] = t.(float64)
  99. } else if jtype == reflect.String {
  100. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  101. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  102. } else {
  103. r[n] = f
  104. }
  105. } else {
  106. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  107. }
  108. case ast.STRINGS:
  109. if jtype == reflect.String {
  110. r[n] = t.(string)
  111. } else {
  112. return fmt.Errorf("invalid data type for %s, expect string but found %[2]T(%[2]v)", n, t)
  113. }
  114. case ast.DATETIME:
  115. switch jtype {
  116. case reflect.Int:
  117. ai := t.(int64)
  118. r[n] = cast.TimeFromUnixMilli(ai)
  119. case reflect.Float64:
  120. ai := int64(t.(float64))
  121. r[n] = cast.TimeFromUnixMilli(ai)
  122. case reflect.String:
  123. if t, err := p.parseTime(t.(string)); err != nil {
  124. return fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", n, err)
  125. } else {
  126. r[n] = t
  127. }
  128. default:
  129. return fmt.Errorf("invalid data type for %s, expect datatime but find %[2]T(%[2]v)", n, t)
  130. }
  131. case ast.BOOLEAN:
  132. if jtype == reflect.Bool {
  133. r[n] = t.(bool)
  134. } else if jtype == reflect.String {
  135. if i, err := strconv.ParseBool(t.(string)); err != nil {
  136. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  137. } else {
  138. r[n] = i
  139. }
  140. } else {
  141. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  142. }
  143. case ast.BYTEA:
  144. if jtype == reflect.String {
  145. if b, err := base64.StdEncoding.DecodeString(t.(string)); err != nil {
  146. return fmt.Errorf("invalid data type for %s, expect bytea but found %[2]T(%[2]v) which cannot base64 decode", n, t)
  147. } else {
  148. r[n] = b
  149. }
  150. } else if jtype == reflect.Slice {
  151. if b, ok := t.([]byte); ok {
  152. r[n] = b
  153. } else {
  154. return fmt.Errorf("invalid data type for %s, expect bytea but found %[2]T(%[2]v)", n, t)
  155. }
  156. }
  157. default:
  158. return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
  159. }
  160. case *ast.ArrayType:
  161. var s []interface{}
  162. if t == nil {
  163. s = nil
  164. } else if jtype == reflect.Slice {
  165. s = t.([]interface{})
  166. } else if jtype == reflect.String {
  167. err := json.Unmarshal([]byte(t.(string)), &s)
  168. if err != nil {
  169. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  170. }
  171. } else {
  172. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  173. }
  174. if tempArr, err := p.addArrayField(st, s); err != nil {
  175. return fmt.Errorf("fail to parse field %s: %s", n, err)
  176. } else {
  177. r[n] = tempArr
  178. }
  179. case *ast.RecType:
  180. nextJ := make(map[string]interface{})
  181. if t == nil {
  182. nextJ = nil
  183. r[n] = nextJ
  184. return nil
  185. } else if jtype == reflect.Map {
  186. nextJ, ok = t.(map[string]interface{})
  187. if !ok {
  188. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  189. }
  190. } else if jtype == reflect.String {
  191. err := json.Unmarshal([]byte(t.(string)), &nextJ)
  192. if err != nil {
  193. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  194. }
  195. } else {
  196. return fmt.Errorf("invalid data type for %s, expect struct but found %[2]T(%[2]v)", n, t)
  197. }
  198. nextR := make(map[string]interface{})
  199. for _, nextF := range st.StreamFields {
  200. nextP := nextF.Name
  201. if e := p.addRecField(nextF.FieldType, nextR, nextJ, nextP); e != nil {
  202. return e
  203. }
  204. }
  205. r[n] = nextR
  206. default:
  207. return fmt.Errorf("unsupported type %T", st)
  208. }
  209. } else {
  210. if p.strictValidation {
  211. return fmt.Errorf("invalid data %s, field %s not found", j, n)
  212. } else {
  213. switch st := ft.(type) {
  214. case *ast.BasicType:
  215. switch st.Type {
  216. case ast.UNKNOWN:
  217. return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
  218. case ast.BIGINT:
  219. r[n] = int64(0)
  220. case ast.FLOAT:
  221. r[n] = 0.0
  222. case ast.STRINGS:
  223. r[n] = ""
  224. case ast.DATETIME:
  225. r[n] = conf.GetNow()
  226. case ast.BOOLEAN:
  227. r[n] = false
  228. case ast.BYTEA:
  229. r[n] = []byte{}
  230. default:
  231. return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
  232. }
  233. case *ast.ArrayType:
  234. if tempArr, err := p.addArrayField(st, nil); err != nil {
  235. return fmt.Errorf("fail to parse field %s: %s", n, err)
  236. } else {
  237. r[n] = tempArr
  238. }
  239. case *ast.RecType:
  240. r[n] = make(map[string]interface{})
  241. default:
  242. return fmt.Errorf("unsupported type %T", st)
  243. }
  244. }
  245. }
  246. return nil
  247. }
  248. //ft must be ast.ArrayType
  249. //side effect: r[p] will be set to the new array
  250. func (p *defaultFieldProcessor) addArrayField(ft *ast.ArrayType, srcSlice []interface{}) (interface{}, error) {
  251. if ft.FieldType != nil { //complex type array or struct
  252. switch st := ft.FieldType.(type) { //Only two complex types supported here
  253. case *ast.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
  254. if srcSlice == nil {
  255. return [][]interface{}(nil), nil
  256. }
  257. var s []interface{}
  258. var tempSlice reflect.Value
  259. for i, t := range srcSlice {
  260. jtype := reflect.ValueOf(t).Kind()
  261. if t == nil {
  262. s = nil
  263. } else if jtype == reflect.Slice || jtype == reflect.Array {
  264. s = t.([]interface{})
  265. } else if jtype == reflect.String {
  266. err := json.Unmarshal([]byte(t.(string)), &s)
  267. if err != nil {
  268. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  269. }
  270. } else {
  271. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  272. }
  273. if tempArr, err := p.addArrayField(st, s); err != nil {
  274. return nil, err
  275. } else {
  276. if !tempSlice.IsValid() {
  277. s := reflect.TypeOf(tempArr)
  278. tempSlice = reflect.MakeSlice(reflect.SliceOf(s), 0, 0)
  279. }
  280. tempSlice = reflect.Append(tempSlice, reflect.ValueOf(tempArr))
  281. }
  282. }
  283. return tempSlice.Interface(), nil
  284. case *ast.RecType:
  285. if srcSlice == nil {
  286. return []map[string]interface{}(nil), nil
  287. }
  288. tempSlice := make([]map[string]interface{}, 0)
  289. for i, t := range srcSlice {
  290. jtype := reflect.ValueOf(t).Kind()
  291. j := make(map[string]interface{})
  292. var ok bool
  293. if t == nil {
  294. j = nil
  295. tempSlice = append(tempSlice, j)
  296. continue
  297. } else if jtype == reflect.Map {
  298. j, ok = t.(map[string]interface{})
  299. if !ok {
  300. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  301. }
  302. } else if jtype == reflect.String {
  303. err := json.Unmarshal([]byte(t.(string)), &j)
  304. if err != nil {
  305. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  306. }
  307. } else {
  308. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  309. }
  310. r := make(map[string]interface{})
  311. for _, f := range st.StreamFields {
  312. n := f.Name
  313. if e := p.addRecField(f.FieldType, r, j, n); e != nil {
  314. return nil, e
  315. }
  316. }
  317. tempSlice = append(tempSlice, r)
  318. }
  319. return tempSlice, nil
  320. default:
  321. return nil, fmt.Errorf("unsupported type %T", st)
  322. }
  323. } else { //basic type
  324. switch ft.Type {
  325. case ast.UNKNOWN:
  326. return nil, fmt.Errorf("invalid data type unknown defined for %s, please checke the stream definition", srcSlice)
  327. case ast.BIGINT:
  328. if srcSlice == nil {
  329. return []int(nil), nil
  330. }
  331. tempSlice := make([]int, 0)
  332. for i, t := range srcSlice {
  333. jtype := reflect.ValueOf(t).Kind()
  334. if jtype == reflect.Float64 {
  335. tempSlice = append(tempSlice, int(t.(float64)))
  336. } else if jtype == reflect.String {
  337. if v, err := strconv.Atoi(t.(string)); err != nil {
  338. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  339. } else {
  340. tempSlice = append(tempSlice, v)
  341. }
  342. } else {
  343. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  344. }
  345. }
  346. return tempSlice, nil
  347. case ast.FLOAT:
  348. if srcSlice == nil {
  349. return []float64(nil), nil
  350. }
  351. tempSlice := make([]float64, 0)
  352. for i, t := range srcSlice {
  353. jtype := reflect.ValueOf(t).Kind()
  354. if jtype == reflect.Float64 {
  355. tempSlice = append(tempSlice, t.(float64))
  356. } else if jtype == reflect.String {
  357. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  358. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  359. } else {
  360. tempSlice = append(tempSlice, f)
  361. }
  362. } else {
  363. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  364. }
  365. }
  366. return tempSlice, nil
  367. case ast.STRINGS:
  368. if srcSlice == nil {
  369. return []string(nil), nil
  370. }
  371. tempSlice := make([]string, 0)
  372. for i, t := range srcSlice {
  373. if reflect.ValueOf(t).Kind() == reflect.String {
  374. tempSlice = append(tempSlice, t.(string))
  375. } else {
  376. return nil, fmt.Errorf("invalid data type for [%d], expect string but found %[2]T(%[2]v)", i, t)
  377. }
  378. }
  379. return tempSlice, nil
  380. case ast.DATETIME:
  381. if srcSlice == nil {
  382. return []time.Time(nil), nil
  383. }
  384. tempSlice := make([]time.Time, 0)
  385. for i, t := range srcSlice {
  386. jtype := reflect.ValueOf(t).Kind()
  387. switch jtype {
  388. case reflect.Int:
  389. ai := t.(int64)
  390. tempSlice = append(tempSlice, cast.TimeFromUnixMilli(ai))
  391. case reflect.Float64:
  392. ai := int64(t.(float64))
  393. tempSlice = append(tempSlice, cast.TimeFromUnixMilli(ai))
  394. case reflect.String:
  395. if ai, err := p.parseTime(t.(string)); err != nil {
  396. return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %[2]T(%[2]v)", t, err)
  397. } else {
  398. tempSlice = append(tempSlice, ai)
  399. }
  400. default:
  401. return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %[2]T(%[2]v)", i, t)
  402. }
  403. }
  404. return tempSlice, nil
  405. case ast.BOOLEAN:
  406. if srcSlice == nil {
  407. return []bool(nil), nil
  408. }
  409. tempSlice := make([]bool, 0)
  410. for i, t := range srcSlice {
  411. jtype := reflect.ValueOf(t).Kind()
  412. if jtype == reflect.Bool {
  413. tempSlice = append(tempSlice, t.(bool))
  414. } else if jtype == reflect.String {
  415. if v, err := strconv.ParseBool(t.(string)); err != nil {
  416. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  417. } else {
  418. tempSlice = append(tempSlice, v)
  419. }
  420. } else {
  421. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  422. }
  423. }
  424. return tempSlice, nil
  425. case ast.BYTEA:
  426. if srcSlice == nil {
  427. return [][]byte(nil), nil
  428. }
  429. tempSlice := make([][]byte, 0)
  430. for i, t := range srcSlice {
  431. jtype := reflect.ValueOf(t).Kind()
  432. if jtype == reflect.String {
  433. if b, err := base64.StdEncoding.DecodeString(t.(string)); err != nil {
  434. return nil, fmt.Errorf("invalid data type for [%d], expect bytea but found %[2]T(%[2]v) which cannot base64 decode", i, t)
  435. } else {
  436. tempSlice = append(tempSlice, b)
  437. }
  438. } else if jtype == reflect.Slice {
  439. if b, ok := t.([]byte); ok {
  440. tempSlice = append(tempSlice, b)
  441. } else {
  442. return nil, fmt.Errorf("invalid data type for [%d], expect bytea but found %[2]T(%[2]v)", i, t)
  443. }
  444. }
  445. }
  446. return tempSlice, nil
  447. default:
  448. return nil, fmt.Errorf("invalid data type for %T", ft.Type)
  449. }
  450. }
  451. }
  452. func (p *defaultFieldProcessor) parseTime(s string) (time.Time, error) {
  453. if p.timestampFormat != "" {
  454. return cast.ParseTime(s, p.timestampFormat)
  455. } else {
  456. return time.Parse(cast.JSISO, s)
  457. }
  458. }