field_processor.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "encoding/base64"
  17. "encoding/json"
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/internal/xsql"
  20. "github.com/lf-edge/ekuiper/pkg/ast"
  21. "github.com/lf-edge/ekuiper/pkg/cast"
  22. "github.com/lf-edge/ekuiper/pkg/message"
  23. "math"
  24. "reflect"
  25. "strconv"
  26. "strings"
  27. "time"
  28. )
  29. type defaultFieldProcessor struct {
  30. streamFields []interface{}
  31. timestampFormat string
  32. isBinary bool
  33. }
  34. func (p *defaultFieldProcessor) processField(tuple *xsql.Tuple, fv *xsql.FunctionValuer) (map[string]interface{}, error) {
  35. result := make(map[string]interface{})
  36. if p.streamFields != nil {
  37. for _, f := range p.streamFields {
  38. switch sf := f.(type) {
  39. case *ast.StreamField:
  40. if p.isBinary {
  41. tuple.Message[sf.Name] = tuple.Message[message.DefaultField]
  42. }
  43. if e := p.addRecField(sf.FieldType, result, tuple.Message, sf.Name); e != nil {
  44. return nil, e
  45. }
  46. case string: //schemaless
  47. if p.isBinary {
  48. result = tuple.Message
  49. } else {
  50. if m, ok := tuple.Message.Value(sf); ok {
  51. result[sf] = m
  52. }
  53. }
  54. }
  55. if p.isBinary {
  56. break //binary format should only have ONE field
  57. }
  58. }
  59. } else {
  60. result = tuple.Message
  61. }
  62. return result, nil
  63. }
  64. func (p *defaultFieldProcessor) addRecField(ft ast.FieldType, r map[string]interface{}, j xsql.Message, n string) error {
  65. if t, ok := j.Value(n); ok {
  66. v := reflect.ValueOf(t)
  67. jtype := v.Kind()
  68. switch st := ft.(type) {
  69. case *ast.BasicType:
  70. switch st.Type {
  71. case ast.UNKNOWN:
  72. return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
  73. case ast.BIGINT:
  74. if jtype == reflect.Int {
  75. r[n] = t.(int)
  76. } else if jtype == reflect.Float64 {
  77. if tt, ok1 := t.(float64); ok1 {
  78. if tt > math.MaxInt64 {
  79. r[n] = uint64(tt)
  80. } else {
  81. r[n] = int(tt)
  82. }
  83. }
  84. } else if jtype == reflect.String {
  85. if i, err := strconv.Atoi(t.(string)); err != nil {
  86. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  87. } else {
  88. r[n] = i
  89. }
  90. } else if jtype == reflect.Uint64 {
  91. r[n] = t.(uint64)
  92. } else {
  93. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  94. }
  95. case ast.FLOAT:
  96. if jtype == reflect.Float64 {
  97. r[n] = t.(float64)
  98. } else if jtype == reflect.String {
  99. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  100. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  101. } else {
  102. r[n] = f
  103. }
  104. } else {
  105. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  106. }
  107. case ast.STRINGS:
  108. if jtype == reflect.String {
  109. r[n] = t.(string)
  110. } else {
  111. return fmt.Errorf("invalid data type for %s, expect string but found %[2]T(%[2]v)", n, t)
  112. }
  113. case ast.DATETIME:
  114. switch jtype {
  115. case reflect.Int:
  116. ai := t.(int64)
  117. r[n] = cast.TimeFromUnixMilli(ai)
  118. case reflect.Float64:
  119. ai := int64(t.(float64))
  120. r[n] = cast.TimeFromUnixMilli(ai)
  121. case reflect.String:
  122. if t, err := p.parseTime(t.(string)); err != nil {
  123. return fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", n, err)
  124. } else {
  125. r[n] = t
  126. }
  127. default:
  128. return fmt.Errorf("invalid data type for %s, expect datatime but find %[2]T(%[2]v)", n, t)
  129. }
  130. case ast.BOOLEAN:
  131. if jtype == reflect.Bool {
  132. r[n] = t.(bool)
  133. } else if jtype == reflect.String {
  134. if i, err := strconv.ParseBool(t.(string)); err != nil {
  135. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  136. } else {
  137. r[n] = i
  138. }
  139. } else {
  140. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  141. }
  142. case ast.BYTEA:
  143. if jtype == reflect.String {
  144. if b, err := base64.StdEncoding.DecodeString(t.(string)); err != nil {
  145. return fmt.Errorf("invalid data type for %s, expect bytea but found %[2]T(%[2]v) which cannot base64 decode", n, t)
  146. } else {
  147. r[n] = b
  148. }
  149. } else if jtype == reflect.Slice {
  150. if b, ok := t.([]byte); ok {
  151. r[n] = b
  152. } else {
  153. return fmt.Errorf("invalid data type for %s, expect bytea but found %[2]T(%[2]v)", n, t)
  154. }
  155. }
  156. default:
  157. return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
  158. }
  159. case *ast.ArrayType:
  160. var s []interface{}
  161. if t == nil {
  162. s = nil
  163. } else if jtype == reflect.Slice {
  164. s = t.([]interface{})
  165. } else if jtype == reflect.String {
  166. err := json.Unmarshal([]byte(t.(string)), &s)
  167. if err != nil {
  168. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  169. }
  170. } else {
  171. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  172. }
  173. if tempArr, err := p.addArrayField(st, s); err != nil {
  174. return fmt.Errorf("fail to parse field %s: %s", n, err)
  175. } else {
  176. r[n] = tempArr
  177. }
  178. case *ast.RecType:
  179. nextJ := make(map[string]interface{})
  180. if t == nil {
  181. nextJ = nil
  182. r[n] = nextJ
  183. return nil
  184. } else if jtype == reflect.Map {
  185. nextJ, ok = t.(map[string]interface{})
  186. if !ok {
  187. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  188. }
  189. } else if jtype == reflect.String {
  190. err := json.Unmarshal([]byte(t.(string)), &nextJ)
  191. if err != nil {
  192. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  193. }
  194. } else {
  195. return fmt.Errorf("invalid data type for %s, expect struct but found %[2]T(%[2]v)", n, t)
  196. }
  197. nextR := make(map[string]interface{})
  198. for _, nextF := range st.StreamFields {
  199. nextP := strings.ToLower(nextF.Name)
  200. if e := p.addRecField(nextF.FieldType, nextR, nextJ, nextP); e != nil {
  201. return e
  202. }
  203. }
  204. r[n] = nextR
  205. default:
  206. return fmt.Errorf("unsupported type %T", st)
  207. }
  208. return nil
  209. } else {
  210. return fmt.Errorf("invalid data %s, field %s not found", j, n)
  211. }
  212. }
  213. //ft must be ast.ArrayType
  214. //side effect: r[p] will be set to the new array
  215. func (p *defaultFieldProcessor) addArrayField(ft *ast.ArrayType, srcSlice []interface{}) (interface{}, error) {
  216. if ft.FieldType != nil { //complex type array or struct
  217. switch st := ft.FieldType.(type) { //Only two complex types supported here
  218. case *ast.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
  219. if srcSlice == nil {
  220. return [][]interface{}(nil), nil
  221. }
  222. var s []interface{}
  223. var tempSlice reflect.Value
  224. for i, t := range srcSlice {
  225. jtype := reflect.ValueOf(t).Kind()
  226. if t == nil {
  227. s = nil
  228. } else if jtype == reflect.Slice || jtype == reflect.Array {
  229. s = t.([]interface{})
  230. } else if jtype == reflect.String {
  231. err := json.Unmarshal([]byte(t.(string)), &s)
  232. if err != nil {
  233. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  234. }
  235. } else {
  236. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  237. }
  238. if tempArr, err := p.addArrayField(st, s); err != nil {
  239. return nil, err
  240. } else {
  241. if !tempSlice.IsValid() {
  242. s := reflect.TypeOf(tempArr)
  243. tempSlice = reflect.MakeSlice(reflect.SliceOf(s), 0, 0)
  244. }
  245. tempSlice = reflect.Append(tempSlice, reflect.ValueOf(tempArr))
  246. }
  247. }
  248. return tempSlice.Interface(), nil
  249. case *ast.RecType:
  250. if srcSlice == nil {
  251. return []map[string]interface{}(nil), nil
  252. }
  253. tempSlice := make([]map[string]interface{}, 0)
  254. for i, t := range srcSlice {
  255. jtype := reflect.ValueOf(t).Kind()
  256. j := make(map[string]interface{})
  257. var ok bool
  258. if t == nil {
  259. j = nil
  260. tempSlice = append(tempSlice, j)
  261. continue
  262. } else if jtype == reflect.Map {
  263. j, ok = t.(map[string]interface{})
  264. if !ok {
  265. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  266. }
  267. } else if jtype == reflect.String {
  268. err := json.Unmarshal([]byte(t.(string)), &j)
  269. if err != nil {
  270. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  271. }
  272. } else {
  273. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  274. }
  275. r := make(map[string]interface{})
  276. for _, f := range st.StreamFields {
  277. n := f.Name
  278. if e := p.addRecField(f.FieldType, r, j, n); e != nil {
  279. return nil, e
  280. }
  281. }
  282. tempSlice = append(tempSlice, r)
  283. }
  284. return tempSlice, nil
  285. default:
  286. return nil, fmt.Errorf("unsupported type %T", st)
  287. }
  288. } else { //basic type
  289. switch ft.Type {
  290. case ast.UNKNOWN:
  291. return nil, fmt.Errorf("invalid data type unknown defined for %s, please checke the stream definition", srcSlice)
  292. case ast.BIGINT:
  293. if srcSlice == nil {
  294. return []int(nil), nil
  295. }
  296. tempSlice := make([]int, 0)
  297. for i, t := range srcSlice {
  298. jtype := reflect.ValueOf(t).Kind()
  299. if jtype == reflect.Float64 {
  300. tempSlice = append(tempSlice, int(t.(float64)))
  301. } else if jtype == reflect.String {
  302. if v, err := strconv.Atoi(t.(string)); err != nil {
  303. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  304. } else {
  305. tempSlice = append(tempSlice, v)
  306. }
  307. } else {
  308. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  309. }
  310. }
  311. return tempSlice, nil
  312. case ast.FLOAT:
  313. if srcSlice == nil {
  314. return []float64(nil), nil
  315. }
  316. tempSlice := make([]float64, 0)
  317. for i, t := range srcSlice {
  318. jtype := reflect.ValueOf(t).Kind()
  319. if jtype == reflect.Float64 {
  320. tempSlice = append(tempSlice, t.(float64))
  321. } else if jtype == reflect.String {
  322. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  323. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  324. } else {
  325. tempSlice = append(tempSlice, f)
  326. }
  327. } else {
  328. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  329. }
  330. }
  331. return tempSlice, nil
  332. case ast.STRINGS:
  333. if srcSlice == nil {
  334. return []string(nil), nil
  335. }
  336. tempSlice := make([]string, 0)
  337. for i, t := range srcSlice {
  338. if reflect.ValueOf(t).Kind() == reflect.String {
  339. tempSlice = append(tempSlice, t.(string))
  340. } else {
  341. return nil, fmt.Errorf("invalid data type for [%d], expect string but found %[2]T(%[2]v)", i, t)
  342. }
  343. }
  344. return tempSlice, nil
  345. case ast.DATETIME:
  346. if srcSlice == nil {
  347. return []time.Time(nil), nil
  348. }
  349. tempSlice := make([]time.Time, 0)
  350. for i, t := range srcSlice {
  351. jtype := reflect.ValueOf(t).Kind()
  352. switch jtype {
  353. case reflect.Int:
  354. ai := t.(int64)
  355. tempSlice = append(tempSlice, cast.TimeFromUnixMilli(ai))
  356. case reflect.Float64:
  357. ai := int64(t.(float64))
  358. tempSlice = append(tempSlice, cast.TimeFromUnixMilli(ai))
  359. case reflect.String:
  360. if ai, err := p.parseTime(t.(string)); err != nil {
  361. return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %[2]T(%[2]v)", t, err)
  362. } else {
  363. tempSlice = append(tempSlice, ai)
  364. }
  365. default:
  366. return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %[2]T(%[2]v)", i, t)
  367. }
  368. }
  369. return tempSlice, nil
  370. case ast.BOOLEAN:
  371. if srcSlice == nil {
  372. return []bool(nil), nil
  373. }
  374. tempSlice := make([]bool, 0)
  375. for i, t := range srcSlice {
  376. jtype := reflect.ValueOf(t).Kind()
  377. if jtype == reflect.Bool {
  378. tempSlice = append(tempSlice, t.(bool))
  379. } else if jtype == reflect.String {
  380. if v, err := strconv.ParseBool(t.(string)); err != nil {
  381. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  382. } else {
  383. tempSlice = append(tempSlice, v)
  384. }
  385. } else {
  386. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  387. }
  388. }
  389. return tempSlice, nil
  390. case ast.BYTEA:
  391. if srcSlice == nil {
  392. return [][]byte(nil), nil
  393. }
  394. tempSlice := make([][]byte, 0)
  395. for i, t := range srcSlice {
  396. jtype := reflect.ValueOf(t).Kind()
  397. if jtype == reflect.String {
  398. if b, err := base64.StdEncoding.DecodeString(t.(string)); err != nil {
  399. return nil, fmt.Errorf("invalid data type for [%d], expect bytea but found %[2]T(%[2]v) which cannot base64 decode", i, t)
  400. } else {
  401. tempSlice = append(tempSlice, b)
  402. }
  403. } else if jtype == reflect.Slice {
  404. if b, ok := t.([]byte); ok {
  405. tempSlice = append(tempSlice, b)
  406. } else {
  407. return nil, fmt.Errorf("invalid data type for [%d], expect bytea but found %[2]T(%[2]v)", i, t)
  408. }
  409. }
  410. }
  411. return tempSlice, nil
  412. default:
  413. return nil, fmt.Errorf("invalid data type for %T", ft.Type)
  414. }
  415. }
  416. }
  417. func (p *defaultFieldProcessor) parseTime(s string) (time.Time, error) {
  418. if p.timestampFormat != "" {
  419. return cast.ParseTime(s, p.timestampFormat)
  420. } else {
  421. return time.Parse(cast.JSISO, s)
  422. }
  423. }