field_processor.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. package operators
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "math"
  9. "reflect"
  10. "strconv"
  11. "strings"
  12. "time"
  13. )
  14. type defaultFieldProcessor struct {
  15. streamFields []interface{}
  16. aliasFields xsql.Fields
  17. timestampFormat string
  18. isBinary bool
  19. }
  20. func (p *defaultFieldProcessor) processField(tuple *xsql.Tuple, fv *xsql.FunctionValuer) (map[string]interface{}, error) {
  21. result := make(map[string]interface{})
  22. if p.streamFields != nil {
  23. for _, f := range p.streamFields {
  24. switch sf := f.(type) {
  25. case *xsql.StreamField:
  26. if p.isBinary {
  27. tuple.Message[sf.Name] = tuple.Message[common.DEFAULT_FIELD]
  28. }
  29. if e := p.addRecField(sf.FieldType, result, tuple.Message, sf.Name); e != nil {
  30. return nil, e
  31. }
  32. case string: //schemaless
  33. if p.isBinary {
  34. result = tuple.Message
  35. } else {
  36. if m, ok := tuple.Message.Value(sf); ok {
  37. result[sf] = m
  38. }
  39. }
  40. }
  41. if p.isBinary {
  42. break //binary format should only have ONE field
  43. }
  44. }
  45. } else {
  46. result = tuple.Message
  47. }
  48. //If the field has alias name, then evaluate the alias field before transfer it to proceeding operators, and put it into result.
  49. //Otherwise, the GROUP BY, ORDER BY statement cannot get the value.
  50. for _, f := range p.aliasFields {
  51. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, fv)}
  52. v := ve.Eval(f.Expr)
  53. if e, ok := v.(error); ok {
  54. return nil, e
  55. } else {
  56. result[f.AName] = v
  57. }
  58. }
  59. return result, nil
  60. }
  61. func (p *defaultFieldProcessor) addRecField(ft xsql.FieldType, r map[string]interface{}, j xsql.Message, n string) error {
  62. if t, ok := j.Value(n); ok {
  63. v := reflect.ValueOf(t)
  64. jtype := v.Kind()
  65. switch st := ft.(type) {
  66. case *xsql.BasicType:
  67. switch st.Type {
  68. case xsql.UNKNOWN:
  69. return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
  70. case xsql.BIGINT:
  71. if jtype == reflect.Int {
  72. r[n] = t.(int)
  73. } else if jtype == reflect.Float64 {
  74. if tt, ok1 := t.(float64); ok1 {
  75. if tt > math.MaxInt64 {
  76. r[n] = uint64(tt)
  77. } else {
  78. r[n] = int(tt)
  79. }
  80. }
  81. } else if jtype == reflect.String {
  82. if i, err := strconv.Atoi(t.(string)); err != nil {
  83. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  84. } else {
  85. r[n] = i
  86. }
  87. } else if jtype == reflect.Uint64 {
  88. r[n] = t.(uint64)
  89. } else {
  90. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  91. }
  92. case xsql.FLOAT:
  93. if jtype == reflect.Float64 {
  94. r[n] = t.(float64)
  95. } else if jtype == reflect.String {
  96. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  97. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  98. } else {
  99. r[n] = f
  100. }
  101. } else {
  102. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  103. }
  104. case xsql.STRINGS:
  105. if jtype == reflect.String {
  106. r[n] = t.(string)
  107. } else {
  108. return fmt.Errorf("invalid data type for %s, expect string but found %[2]T(%[2]v)", n, t)
  109. }
  110. case xsql.DATETIME:
  111. switch jtype {
  112. case reflect.Int:
  113. ai := t.(int64)
  114. r[n] = common.TimeFromUnixMilli(ai)
  115. case reflect.Float64:
  116. ai := int64(t.(float64))
  117. r[n] = common.TimeFromUnixMilli(ai)
  118. case reflect.String:
  119. if t, err := p.parseTime(t.(string)); err != nil {
  120. return fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", n, err)
  121. } else {
  122. r[n] = t
  123. }
  124. default:
  125. return fmt.Errorf("invalid data type for %s, expect datatime but find %[2]T(%[2]v)", n, t)
  126. }
  127. case xsql.BOOLEAN:
  128. if jtype == reflect.Bool {
  129. r[n] = t.(bool)
  130. } else if jtype == reflect.String {
  131. if i, err := strconv.ParseBool(t.(string)); err != nil {
  132. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  133. } else {
  134. r[n] = i
  135. }
  136. } else {
  137. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  138. }
  139. case xsql.BYTEA:
  140. if jtype == reflect.String {
  141. if b, err := base64.StdEncoding.DecodeString(t.(string)); err != nil {
  142. return fmt.Errorf("invalid data type for %s, expect bytea but found %[2]T(%[2]v) which cannot base64 decode", n, t)
  143. } else {
  144. r[n] = b
  145. }
  146. } else if jtype == reflect.Slice {
  147. if b, ok := t.([]byte); ok {
  148. r[n] = b
  149. } else {
  150. return fmt.Errorf("invalid data type for %s, expect bytea but found %[2]T(%[2]v)", n, t)
  151. }
  152. }
  153. default:
  154. return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
  155. }
  156. case *xsql.ArrayType:
  157. var s []interface{}
  158. if t == nil {
  159. s = nil
  160. } else if jtype == reflect.Slice {
  161. s = t.([]interface{})
  162. } else if jtype == reflect.String {
  163. err := json.Unmarshal([]byte(t.(string)), &s)
  164. if err != nil {
  165. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  166. }
  167. } else {
  168. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  169. }
  170. if tempArr, err := p.addArrayField(st, s); err != nil {
  171. return fmt.Errorf("fail to parse field %s: %s", n, err)
  172. } else {
  173. r[n] = tempArr
  174. }
  175. case *xsql.RecType:
  176. nextJ := make(map[string]interface{})
  177. if t == nil {
  178. nextJ = nil
  179. r[n] = nextJ
  180. return nil
  181. } else if jtype == reflect.Map {
  182. nextJ, ok = t.(map[string]interface{})
  183. if !ok {
  184. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  185. }
  186. } else if jtype == reflect.String {
  187. err := json.Unmarshal([]byte(t.(string)), &nextJ)
  188. if err != nil {
  189. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  190. }
  191. } else {
  192. return fmt.Errorf("invalid data type for %s, expect struct but found %[2]T(%[2]v)", n, t)
  193. }
  194. nextR := make(map[string]interface{})
  195. for _, nextF := range st.StreamFields {
  196. nextP := strings.ToLower(nextF.Name)
  197. if e := p.addRecField(nextF.FieldType, nextR, nextJ, nextP); e != nil {
  198. return e
  199. }
  200. }
  201. r[n] = nextR
  202. default:
  203. return fmt.Errorf("unsupported type %T", st)
  204. }
  205. return nil
  206. } else {
  207. return fmt.Errorf("invalid data %s, field %s not found", j, n)
  208. }
  209. }
  210. //ft must be xsql.ArrayType
  211. //side effect: r[p] will be set to the new array
  212. func (p *defaultFieldProcessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{}) (interface{}, error) {
  213. if ft.FieldType != nil { //complex type array or struct
  214. switch st := ft.FieldType.(type) { //Only two complex types supported here
  215. case *xsql.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
  216. if srcSlice == nil {
  217. return [][]interface{}(nil), nil
  218. }
  219. var s []interface{}
  220. var tempSlice reflect.Value
  221. for i, t := range srcSlice {
  222. jtype := reflect.ValueOf(t).Kind()
  223. if t == nil {
  224. s = nil
  225. } else if jtype == reflect.Slice || jtype == reflect.Array {
  226. s = t.([]interface{})
  227. } else if jtype == reflect.String {
  228. err := json.Unmarshal([]byte(t.(string)), &s)
  229. if err != nil {
  230. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  231. }
  232. } else {
  233. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  234. }
  235. if tempArr, err := p.addArrayField(st, s); err != nil {
  236. return nil, err
  237. } else {
  238. if !tempSlice.IsValid() {
  239. s := reflect.TypeOf(tempArr)
  240. tempSlice = reflect.MakeSlice(reflect.SliceOf(s), 0, 0)
  241. }
  242. tempSlice = reflect.Append(tempSlice, reflect.ValueOf(tempArr))
  243. }
  244. }
  245. return tempSlice.Interface(), nil
  246. case *xsql.RecType:
  247. if srcSlice == nil {
  248. return []map[string]interface{}(nil), nil
  249. }
  250. tempSlice := make([]map[string]interface{}, 0)
  251. for i, t := range srcSlice {
  252. jtype := reflect.ValueOf(t).Kind()
  253. j := make(map[string]interface{})
  254. var ok bool
  255. if t == nil {
  256. j = nil
  257. tempSlice = append(tempSlice, j)
  258. continue
  259. } else if jtype == reflect.Map {
  260. j, ok = t.(map[string]interface{})
  261. if !ok {
  262. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  263. }
  264. } else if jtype == reflect.String {
  265. err := json.Unmarshal([]byte(t.(string)), &j)
  266. if err != nil {
  267. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  268. }
  269. } else {
  270. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  271. }
  272. r := make(map[string]interface{})
  273. for _, f := range st.StreamFields {
  274. n := f.Name
  275. if e := p.addRecField(f.FieldType, r, j, n); e != nil {
  276. return nil, e
  277. }
  278. }
  279. tempSlice = append(tempSlice, r)
  280. }
  281. return tempSlice, nil
  282. default:
  283. return nil, fmt.Errorf("unsupported type %T", st)
  284. }
  285. } else { //basic type
  286. switch ft.Type {
  287. case xsql.UNKNOWN:
  288. return nil, fmt.Errorf("invalid data type unknown defined for %s, please checke the stream definition", srcSlice)
  289. case xsql.BIGINT:
  290. if srcSlice == nil {
  291. return []int(nil), nil
  292. }
  293. tempSlice := make([]int, 0)
  294. for i, t := range srcSlice {
  295. jtype := reflect.ValueOf(t).Kind()
  296. if jtype == reflect.Float64 {
  297. tempSlice = append(tempSlice, int(t.(float64)))
  298. } else if jtype == reflect.String {
  299. if v, err := strconv.Atoi(t.(string)); err != nil {
  300. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  301. } else {
  302. tempSlice = append(tempSlice, v)
  303. }
  304. } else {
  305. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  306. }
  307. }
  308. return tempSlice, nil
  309. case xsql.FLOAT:
  310. if srcSlice == nil {
  311. return []float64(nil), nil
  312. }
  313. tempSlice := make([]float64, 0)
  314. for i, t := range srcSlice {
  315. jtype := reflect.ValueOf(t).Kind()
  316. if jtype == reflect.Float64 {
  317. tempSlice = append(tempSlice, t.(float64))
  318. } else if jtype == reflect.String {
  319. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  320. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  321. } else {
  322. tempSlice = append(tempSlice, f)
  323. }
  324. } else {
  325. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  326. }
  327. }
  328. return tempSlice, nil
  329. case xsql.STRINGS:
  330. if srcSlice == nil {
  331. return []string(nil), nil
  332. }
  333. tempSlice := make([]string, 0)
  334. for i, t := range srcSlice {
  335. if reflect.ValueOf(t).Kind() == reflect.String {
  336. tempSlice = append(tempSlice, t.(string))
  337. } else {
  338. return nil, fmt.Errorf("invalid data type for [%d], expect string but found %[2]T(%[2]v)", i, t)
  339. }
  340. }
  341. return tempSlice, nil
  342. case xsql.DATETIME:
  343. if srcSlice == nil {
  344. return []time.Time(nil), nil
  345. }
  346. tempSlice := make([]time.Time, 0)
  347. for i, t := range srcSlice {
  348. jtype := reflect.ValueOf(t).Kind()
  349. switch jtype {
  350. case reflect.Int:
  351. ai := t.(int64)
  352. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  353. case reflect.Float64:
  354. ai := int64(t.(float64))
  355. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  356. case reflect.String:
  357. if ai, err := p.parseTime(t.(string)); err != nil {
  358. return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %[2]T(%[2]v)", t, err)
  359. } else {
  360. tempSlice = append(tempSlice, ai)
  361. }
  362. default:
  363. return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %[2]T(%[2]v)", i, t)
  364. }
  365. }
  366. return tempSlice, nil
  367. case xsql.BOOLEAN:
  368. if srcSlice == nil {
  369. return []bool(nil), nil
  370. }
  371. tempSlice := make([]bool, 0)
  372. for i, t := range srcSlice {
  373. jtype := reflect.ValueOf(t).Kind()
  374. if jtype == reflect.Bool {
  375. tempSlice = append(tempSlice, t.(bool))
  376. } else if jtype == reflect.String {
  377. if v, err := strconv.ParseBool(t.(string)); err != nil {
  378. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  379. } else {
  380. tempSlice = append(tempSlice, v)
  381. }
  382. } else {
  383. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  384. }
  385. }
  386. return tempSlice, nil
  387. case xsql.BYTEA:
  388. if srcSlice == nil {
  389. return [][]byte(nil), nil
  390. }
  391. tempSlice := make([][]byte, 0)
  392. for i, t := range srcSlice {
  393. jtype := reflect.ValueOf(t).Kind()
  394. if jtype == reflect.String {
  395. if b, err := base64.StdEncoding.DecodeString(t.(string)); err != nil {
  396. return nil, fmt.Errorf("invalid data type for [%d], expect bytea but found %[2]T(%[2]v) which cannot base64 decode", i, t)
  397. } else {
  398. tempSlice = append(tempSlice, b)
  399. }
  400. } else if jtype == reflect.Slice {
  401. if b, ok := t.([]byte); ok {
  402. tempSlice = append(tempSlice, b)
  403. } else {
  404. return nil, fmt.Errorf("invalid data type for [%d], expect bytea but found %[2]T(%[2]v)", i, t)
  405. }
  406. }
  407. }
  408. return tempSlice, nil
  409. default:
  410. return nil, fmt.Errorf("invalid data type for %T", ft.Type)
  411. }
  412. }
  413. }
  414. func (p *defaultFieldProcessor) parseTime(s string) (time.Time, error) {
  415. if p.timestampFormat != "" {
  416. return common.ParseTime(s, p.timestampFormat)
  417. } else {
  418. return time.Parse(common.JSISO, s)
  419. }
  420. }