field_processor.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "encoding/base64"
  17. "encoding/json"
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/xsql"
  21. "github.com/lf-edge/ekuiper/pkg/ast"
  22. "github.com/lf-edge/ekuiper/pkg/cast"
  23. "github.com/lf-edge/ekuiper/pkg/message"
  24. "math"
  25. "reflect"
  26. "strconv"
  27. "strings"
  28. "time"
  29. )
  30. type defaultFieldProcessor struct {
  31. streamFields []interface{}
  32. timestampFormat string
  33. isBinary bool
  34. strictValidation bool
  35. }
  36. func (p *defaultFieldProcessor) processField(tuple *xsql.Tuple, _ *xsql.FunctionValuer) (map[string]interface{}, error) {
  37. result := make(map[string]interface{})
  38. if p.streamFields != nil {
  39. for _, f := range p.streamFields {
  40. switch sf := f.(type) {
  41. case *ast.StreamField:
  42. if p.isBinary {
  43. tuple.Message[sf.Name] = tuple.Message[message.DefaultField]
  44. }
  45. if e := p.addRecField(sf.FieldType, result, tuple.Message, sf.Name); e != nil {
  46. return nil, e
  47. }
  48. case string: //schemaless
  49. if p.isBinary {
  50. result = tuple.Message
  51. } else {
  52. if m, ok := tuple.Message.Value(sf); ok {
  53. result[sf] = m
  54. }
  55. }
  56. }
  57. if p.isBinary {
  58. break //binary format should only have ONE field
  59. }
  60. }
  61. } else {
  62. result = tuple.Message
  63. }
  64. return result, nil
  65. }
  66. func (p *defaultFieldProcessor) addRecField(ft ast.FieldType, r map[string]interface{}, j xsql.Message, n string) error {
  67. if t, ok := j.Value(n); ok {
  68. v := reflect.ValueOf(t)
  69. jtype := v.Kind()
  70. switch st := ft.(type) {
  71. case *ast.BasicType:
  72. switch st.Type {
  73. case ast.UNKNOWN:
  74. return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
  75. case ast.BIGINT:
  76. if jtype == reflect.Int {
  77. r[n] = t.(int)
  78. } else if jtype == reflect.Float64 {
  79. if tt, ok1 := t.(float64); ok1 {
  80. if tt > math.MaxInt64 {
  81. r[n] = uint64(tt)
  82. } else {
  83. r[n] = int(tt)
  84. }
  85. }
  86. } else if jtype == reflect.String {
  87. if i, err := strconv.Atoi(t.(string)); err != nil {
  88. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  89. } else {
  90. r[n] = i
  91. }
  92. } else if jtype == reflect.Uint64 {
  93. r[n] = t.(uint64)
  94. } else {
  95. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  96. }
  97. case ast.FLOAT:
  98. if jtype == reflect.Float64 {
  99. r[n] = t.(float64)
  100. } else if jtype == reflect.String {
  101. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  102. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  103. } else {
  104. r[n] = f
  105. }
  106. } else {
  107. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  108. }
  109. case ast.STRINGS:
  110. if jtype == reflect.String {
  111. r[n] = t.(string)
  112. } else {
  113. return fmt.Errorf("invalid data type for %s, expect string but found %[2]T(%[2]v)", n, t)
  114. }
  115. case ast.DATETIME:
  116. switch jtype {
  117. case reflect.Int:
  118. ai := t.(int64)
  119. r[n] = cast.TimeFromUnixMilli(ai)
  120. case reflect.Float64:
  121. ai := int64(t.(float64))
  122. r[n] = cast.TimeFromUnixMilli(ai)
  123. case reflect.String:
  124. if t, err := p.parseTime(t.(string)); err != nil {
  125. return fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", n, err)
  126. } else {
  127. r[n] = t
  128. }
  129. default:
  130. return fmt.Errorf("invalid data type for %s, expect datatime but find %[2]T(%[2]v)", n, t)
  131. }
  132. case ast.BOOLEAN:
  133. if jtype == reflect.Bool {
  134. r[n] = t.(bool)
  135. } else if jtype == reflect.String {
  136. if i, err := strconv.ParseBool(t.(string)); err != nil {
  137. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  138. } else {
  139. r[n] = i
  140. }
  141. } else {
  142. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  143. }
  144. case ast.BYTEA:
  145. if jtype == reflect.String {
  146. if b, err := base64.StdEncoding.DecodeString(t.(string)); err != nil {
  147. return fmt.Errorf("invalid data type for %s, expect bytea but found %[2]T(%[2]v) which cannot base64 decode", n, t)
  148. } else {
  149. r[n] = b
  150. }
  151. } else if jtype == reflect.Slice {
  152. if b, ok := t.([]byte); ok {
  153. r[n] = b
  154. } else {
  155. return fmt.Errorf("invalid data type for %s, expect bytea but found %[2]T(%[2]v)", n, t)
  156. }
  157. }
  158. default:
  159. return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
  160. }
  161. case *ast.ArrayType:
  162. var s []interface{}
  163. if t == nil {
  164. s = nil
  165. } else if jtype == reflect.Slice {
  166. s = t.([]interface{})
  167. } else if jtype == reflect.String {
  168. err := json.Unmarshal([]byte(t.(string)), &s)
  169. if err != nil {
  170. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  171. }
  172. } else {
  173. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  174. }
  175. if tempArr, err := p.addArrayField(st, s); err != nil {
  176. return fmt.Errorf("fail to parse field %s: %s", n, err)
  177. } else {
  178. r[n] = tempArr
  179. }
  180. case *ast.RecType:
  181. nextJ := make(map[string]interface{})
  182. if t == nil {
  183. nextJ = nil
  184. r[n] = nextJ
  185. return nil
  186. } else if jtype == reflect.Map {
  187. nextJ, ok = t.(map[string]interface{})
  188. if !ok {
  189. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  190. }
  191. } else if jtype == reflect.String {
  192. err := json.Unmarshal([]byte(t.(string)), &nextJ)
  193. if err != nil {
  194. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  195. }
  196. } else {
  197. return fmt.Errorf("invalid data type for %s, expect struct but found %[2]T(%[2]v)", n, t)
  198. }
  199. nextR := make(map[string]interface{})
  200. for _, nextF := range st.StreamFields {
  201. nextP := strings.ToLower(nextF.Name)
  202. if e := p.addRecField(nextF.FieldType, nextR, nextJ, nextP); e != nil {
  203. return e
  204. }
  205. }
  206. r[n] = nextR
  207. default:
  208. return fmt.Errorf("unsupported type %T", st)
  209. }
  210. } else {
  211. if p.strictValidation {
  212. return fmt.Errorf("invalid data %s, field %s not found", j, n)
  213. } else {
  214. switch st := ft.(type) {
  215. case *ast.BasicType:
  216. switch st.Type {
  217. case ast.UNKNOWN:
  218. return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
  219. case ast.BIGINT:
  220. r[n] = int64(0)
  221. case ast.FLOAT:
  222. r[n] = 0.0
  223. case ast.STRINGS:
  224. r[n] = ""
  225. case ast.DATETIME:
  226. r[n] = conf.GetNow()
  227. case ast.BOOLEAN:
  228. r[n] = false
  229. case ast.BYTEA:
  230. r[n] = []byte{}
  231. default:
  232. return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
  233. }
  234. case *ast.ArrayType:
  235. if tempArr, err := p.addArrayField(st, nil); err != nil {
  236. return fmt.Errorf("fail to parse field %s: %s", n, err)
  237. } else {
  238. r[n] = tempArr
  239. }
  240. case *ast.RecType:
  241. r[n] = make(map[string]interface{})
  242. default:
  243. return fmt.Errorf("unsupported type %T", st)
  244. }
  245. }
  246. }
  247. return nil
  248. }
  249. //ft must be ast.ArrayType
  250. //side effect: r[p] will be set to the new array
  251. func (p *defaultFieldProcessor) addArrayField(ft *ast.ArrayType, srcSlice []interface{}) (interface{}, error) {
  252. if ft.FieldType != nil { //complex type array or struct
  253. switch st := ft.FieldType.(type) { //Only two complex types supported here
  254. case *ast.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
  255. if srcSlice == nil {
  256. return [][]interface{}(nil), nil
  257. }
  258. var s []interface{}
  259. var tempSlice reflect.Value
  260. for i, t := range srcSlice {
  261. jtype := reflect.ValueOf(t).Kind()
  262. if t == nil {
  263. s = nil
  264. } else if jtype == reflect.Slice || jtype == reflect.Array {
  265. s = t.([]interface{})
  266. } else if jtype == reflect.String {
  267. err := json.Unmarshal([]byte(t.(string)), &s)
  268. if err != nil {
  269. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  270. }
  271. } else {
  272. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  273. }
  274. if tempArr, err := p.addArrayField(st, s); err != nil {
  275. return nil, err
  276. } else {
  277. if !tempSlice.IsValid() {
  278. s := reflect.TypeOf(tempArr)
  279. tempSlice = reflect.MakeSlice(reflect.SliceOf(s), 0, 0)
  280. }
  281. tempSlice = reflect.Append(tempSlice, reflect.ValueOf(tempArr))
  282. }
  283. }
  284. return tempSlice.Interface(), nil
  285. case *ast.RecType:
  286. if srcSlice == nil {
  287. return []map[string]interface{}(nil), nil
  288. }
  289. tempSlice := make([]map[string]interface{}, 0)
  290. for i, t := range srcSlice {
  291. jtype := reflect.ValueOf(t).Kind()
  292. j := make(map[string]interface{})
  293. var ok bool
  294. if t == nil {
  295. j = nil
  296. tempSlice = append(tempSlice, j)
  297. continue
  298. } else if jtype == reflect.Map {
  299. j, ok = t.(map[string]interface{})
  300. if !ok {
  301. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  302. }
  303. } else if jtype == reflect.String {
  304. err := json.Unmarshal([]byte(t.(string)), &j)
  305. if err != nil {
  306. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  307. }
  308. } else {
  309. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  310. }
  311. r := make(map[string]interface{})
  312. for _, f := range st.StreamFields {
  313. n := f.Name
  314. if e := p.addRecField(f.FieldType, r, j, n); e != nil {
  315. return nil, e
  316. }
  317. }
  318. tempSlice = append(tempSlice, r)
  319. }
  320. return tempSlice, nil
  321. default:
  322. return nil, fmt.Errorf("unsupported type %T", st)
  323. }
  324. } else { //basic type
  325. switch ft.Type {
  326. case ast.UNKNOWN:
  327. return nil, fmt.Errorf("invalid data type unknown defined for %s, please checke the stream definition", srcSlice)
  328. case ast.BIGINT:
  329. if srcSlice == nil {
  330. return []int(nil), nil
  331. }
  332. tempSlice := make([]int, 0)
  333. for i, t := range srcSlice {
  334. jtype := reflect.ValueOf(t).Kind()
  335. if jtype == reflect.Float64 {
  336. tempSlice = append(tempSlice, int(t.(float64)))
  337. } else if jtype == reflect.String {
  338. if v, err := strconv.Atoi(t.(string)); err != nil {
  339. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  340. } else {
  341. tempSlice = append(tempSlice, v)
  342. }
  343. } else {
  344. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  345. }
  346. }
  347. return tempSlice, nil
  348. case ast.FLOAT:
  349. if srcSlice == nil {
  350. return []float64(nil), nil
  351. }
  352. tempSlice := make([]float64, 0)
  353. for i, t := range srcSlice {
  354. jtype := reflect.ValueOf(t).Kind()
  355. if jtype == reflect.Float64 {
  356. tempSlice = append(tempSlice, t.(float64))
  357. } else if jtype == reflect.String {
  358. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  359. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  360. } else {
  361. tempSlice = append(tempSlice, f)
  362. }
  363. } else {
  364. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  365. }
  366. }
  367. return tempSlice, nil
  368. case ast.STRINGS:
  369. if srcSlice == nil {
  370. return []string(nil), nil
  371. }
  372. tempSlice := make([]string, 0)
  373. for i, t := range srcSlice {
  374. if reflect.ValueOf(t).Kind() == reflect.String {
  375. tempSlice = append(tempSlice, t.(string))
  376. } else {
  377. return nil, fmt.Errorf("invalid data type for [%d], expect string but found %[2]T(%[2]v)", i, t)
  378. }
  379. }
  380. return tempSlice, nil
  381. case ast.DATETIME:
  382. if srcSlice == nil {
  383. return []time.Time(nil), nil
  384. }
  385. tempSlice := make([]time.Time, 0)
  386. for i, t := range srcSlice {
  387. jtype := reflect.ValueOf(t).Kind()
  388. switch jtype {
  389. case reflect.Int:
  390. ai := t.(int64)
  391. tempSlice = append(tempSlice, cast.TimeFromUnixMilli(ai))
  392. case reflect.Float64:
  393. ai := int64(t.(float64))
  394. tempSlice = append(tempSlice, cast.TimeFromUnixMilli(ai))
  395. case reflect.String:
  396. if ai, err := p.parseTime(t.(string)); err != nil {
  397. return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %[2]T(%[2]v)", t, err)
  398. } else {
  399. tempSlice = append(tempSlice, ai)
  400. }
  401. default:
  402. return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %[2]T(%[2]v)", i, t)
  403. }
  404. }
  405. return tempSlice, nil
  406. case ast.BOOLEAN:
  407. if srcSlice == nil {
  408. return []bool(nil), nil
  409. }
  410. tempSlice := make([]bool, 0)
  411. for i, t := range srcSlice {
  412. jtype := reflect.ValueOf(t).Kind()
  413. if jtype == reflect.Bool {
  414. tempSlice = append(tempSlice, t.(bool))
  415. } else if jtype == reflect.String {
  416. if v, err := strconv.ParseBool(t.(string)); err != nil {
  417. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  418. } else {
  419. tempSlice = append(tempSlice, v)
  420. }
  421. } else {
  422. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  423. }
  424. }
  425. return tempSlice, nil
  426. case ast.BYTEA:
  427. if srcSlice == nil {
  428. return [][]byte(nil), nil
  429. }
  430. tempSlice := make([][]byte, 0)
  431. for i, t := range srcSlice {
  432. jtype := reflect.ValueOf(t).Kind()
  433. if jtype == reflect.String {
  434. if b, err := base64.StdEncoding.DecodeString(t.(string)); err != nil {
  435. return nil, fmt.Errorf("invalid data type for [%d], expect bytea but found %[2]T(%[2]v) which cannot base64 decode", i, t)
  436. } else {
  437. tempSlice = append(tempSlice, b)
  438. }
  439. } else if jtype == reflect.Slice {
  440. if b, ok := t.([]byte); ok {
  441. tempSlice = append(tempSlice, b)
  442. } else {
  443. return nil, fmt.Errorf("invalid data type for [%d], expect bytea but found %[2]T(%[2]v)", i, t)
  444. }
  445. }
  446. }
  447. return tempSlice, nil
  448. default:
  449. return nil, fmt.Errorf("invalid data type for %T", ft.Type)
  450. }
  451. }
  452. }
  453. func (p *defaultFieldProcessor) parseTime(s string) (time.Time, error) {
  454. if p.timestampFormat != "" {
  455. return cast.ParseTime(s, p.timestampFormat)
  456. } else {
  457. return time.Parse(cast.JSISO, s)
  458. }
  459. }