field_processor.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. package operator
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/internal/xsql"
  7. "github.com/emqx/kuiper/pkg/ast"
  8. "github.com/emqx/kuiper/pkg/cast"
  9. "github.com/emqx/kuiper/pkg/message"
  10. "math"
  11. "reflect"
  12. "strconv"
  13. "strings"
  14. "time"
  15. )
  16. type defaultFieldProcessor struct {
  17. streamFields []interface{}
  18. timestampFormat string
  19. isBinary bool
  20. }
  21. func (p *defaultFieldProcessor) processField(tuple *xsql.Tuple, fv *xsql.FunctionValuer) (map[string]interface{}, error) {
  22. result := make(map[string]interface{})
  23. if p.streamFields != nil {
  24. for _, f := range p.streamFields {
  25. switch sf := f.(type) {
  26. case *ast.StreamField:
  27. if p.isBinary {
  28. tuple.Message[sf.Name] = tuple.Message[message.DefaultField]
  29. }
  30. if e := p.addRecField(sf.FieldType, result, tuple.Message, sf.Name); e != nil {
  31. return nil, e
  32. }
  33. case string: //schemaless
  34. if p.isBinary {
  35. result = tuple.Message
  36. } else {
  37. if m, ok := tuple.Message.Value(sf); ok {
  38. result[sf] = m
  39. }
  40. }
  41. }
  42. if p.isBinary {
  43. break //binary format should only have ONE field
  44. }
  45. }
  46. } else {
  47. result = tuple.Message
  48. }
  49. return result, nil
  50. }
  51. func (p *defaultFieldProcessor) addRecField(ft ast.FieldType, r map[string]interface{}, j xsql.Message, n string) error {
  52. if t, ok := j.Value(n); ok {
  53. v := reflect.ValueOf(t)
  54. jtype := v.Kind()
  55. switch st := ft.(type) {
  56. case *ast.BasicType:
  57. switch st.Type {
  58. case ast.UNKNOWN:
  59. return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
  60. case ast.BIGINT:
  61. if jtype == reflect.Int {
  62. r[n] = t.(int)
  63. } else if jtype == reflect.Float64 {
  64. if tt, ok1 := t.(float64); ok1 {
  65. if tt > math.MaxInt64 {
  66. r[n] = uint64(tt)
  67. } else {
  68. r[n] = int(tt)
  69. }
  70. }
  71. } else if jtype == reflect.String {
  72. if i, err := strconv.Atoi(t.(string)); err != nil {
  73. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  74. } else {
  75. r[n] = i
  76. }
  77. } else if jtype == reflect.Uint64 {
  78. r[n] = t.(uint64)
  79. } else {
  80. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  81. }
  82. case ast.FLOAT:
  83. if jtype == reflect.Float64 {
  84. r[n] = t.(float64)
  85. } else if jtype == reflect.String {
  86. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  87. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  88. } else {
  89. r[n] = f
  90. }
  91. } else {
  92. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  93. }
  94. case ast.STRINGS:
  95. if jtype == reflect.String {
  96. r[n] = t.(string)
  97. } else {
  98. return fmt.Errorf("invalid data type for %s, expect string but found %[2]T(%[2]v)", n, t)
  99. }
  100. case ast.DATETIME:
  101. switch jtype {
  102. case reflect.Int:
  103. ai := t.(int64)
  104. r[n] = cast.TimeFromUnixMilli(ai)
  105. case reflect.Float64:
  106. ai := int64(t.(float64))
  107. r[n] = cast.TimeFromUnixMilli(ai)
  108. case reflect.String:
  109. if t, err := p.parseTime(t.(string)); err != nil {
  110. return fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", n, err)
  111. } else {
  112. r[n] = t
  113. }
  114. default:
  115. return fmt.Errorf("invalid data type for %s, expect datatime but find %[2]T(%[2]v)", n, t)
  116. }
  117. case ast.BOOLEAN:
  118. if jtype == reflect.Bool {
  119. r[n] = t.(bool)
  120. } else if jtype == reflect.String {
  121. if i, err := strconv.ParseBool(t.(string)); err != nil {
  122. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  123. } else {
  124. r[n] = i
  125. }
  126. } else {
  127. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  128. }
  129. case ast.BYTEA:
  130. if jtype == reflect.String {
  131. if b, err := base64.StdEncoding.DecodeString(t.(string)); err != nil {
  132. return fmt.Errorf("invalid data type for %s, expect bytea but found %[2]T(%[2]v) which cannot base64 decode", n, t)
  133. } else {
  134. r[n] = b
  135. }
  136. } else if jtype == reflect.Slice {
  137. if b, ok := t.([]byte); ok {
  138. r[n] = b
  139. } else {
  140. return fmt.Errorf("invalid data type for %s, expect bytea but found %[2]T(%[2]v)", n, t)
  141. }
  142. }
  143. default:
  144. return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
  145. }
  146. case *ast.ArrayType:
  147. var s []interface{}
  148. if t == nil {
  149. s = nil
  150. } else if jtype == reflect.Slice {
  151. s = t.([]interface{})
  152. } else if jtype == reflect.String {
  153. err := json.Unmarshal([]byte(t.(string)), &s)
  154. if err != nil {
  155. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  156. }
  157. } else {
  158. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  159. }
  160. if tempArr, err := p.addArrayField(st, s); err != nil {
  161. return fmt.Errorf("fail to parse field %s: %s", n, err)
  162. } else {
  163. r[n] = tempArr
  164. }
  165. case *ast.RecType:
  166. nextJ := make(map[string]interface{})
  167. if t == nil {
  168. nextJ = nil
  169. r[n] = nextJ
  170. return nil
  171. } else if jtype == reflect.Map {
  172. nextJ, ok = t.(map[string]interface{})
  173. if !ok {
  174. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  175. }
  176. } else if jtype == reflect.String {
  177. err := json.Unmarshal([]byte(t.(string)), &nextJ)
  178. if err != nil {
  179. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  180. }
  181. } else {
  182. return fmt.Errorf("invalid data type for %s, expect struct but found %[2]T(%[2]v)", n, t)
  183. }
  184. nextR := make(map[string]interface{})
  185. for _, nextF := range st.StreamFields {
  186. nextP := strings.ToLower(nextF.Name)
  187. if e := p.addRecField(nextF.FieldType, nextR, nextJ, nextP); e != nil {
  188. return e
  189. }
  190. }
  191. r[n] = nextR
  192. default:
  193. return fmt.Errorf("unsupported type %T", st)
  194. }
  195. return nil
  196. } else {
  197. return fmt.Errorf("invalid data %s, field %s not found", j, n)
  198. }
  199. }
  200. //ft must be ast.ArrayType
  201. //side effect: r[p] will be set to the new array
  202. func (p *defaultFieldProcessor) addArrayField(ft *ast.ArrayType, srcSlice []interface{}) (interface{}, error) {
  203. if ft.FieldType != nil { //complex type array or struct
  204. switch st := ft.FieldType.(type) { //Only two complex types supported here
  205. case *ast.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
  206. if srcSlice == nil {
  207. return [][]interface{}(nil), nil
  208. }
  209. var s []interface{}
  210. var tempSlice reflect.Value
  211. for i, t := range srcSlice {
  212. jtype := reflect.ValueOf(t).Kind()
  213. if t == nil {
  214. s = nil
  215. } else if jtype == reflect.Slice || jtype == reflect.Array {
  216. s = t.([]interface{})
  217. } else if jtype == reflect.String {
  218. err := json.Unmarshal([]byte(t.(string)), &s)
  219. if err != nil {
  220. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  221. }
  222. } else {
  223. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  224. }
  225. if tempArr, err := p.addArrayField(st, s); err != nil {
  226. return nil, err
  227. } else {
  228. if !tempSlice.IsValid() {
  229. s := reflect.TypeOf(tempArr)
  230. tempSlice = reflect.MakeSlice(reflect.SliceOf(s), 0, 0)
  231. }
  232. tempSlice = reflect.Append(tempSlice, reflect.ValueOf(tempArr))
  233. }
  234. }
  235. return tempSlice.Interface(), nil
  236. case *ast.RecType:
  237. if srcSlice == nil {
  238. return []map[string]interface{}(nil), nil
  239. }
  240. tempSlice := make([]map[string]interface{}, 0)
  241. for i, t := range srcSlice {
  242. jtype := reflect.ValueOf(t).Kind()
  243. j := make(map[string]interface{})
  244. var ok bool
  245. if t == nil {
  246. j = nil
  247. tempSlice = append(tempSlice, j)
  248. continue
  249. } else if jtype == reflect.Map {
  250. j, ok = t.(map[string]interface{})
  251. if !ok {
  252. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  253. }
  254. } else if jtype == reflect.String {
  255. err := json.Unmarshal([]byte(t.(string)), &j)
  256. if err != nil {
  257. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  258. }
  259. } else {
  260. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  261. }
  262. r := make(map[string]interface{})
  263. for _, f := range st.StreamFields {
  264. n := f.Name
  265. if e := p.addRecField(f.FieldType, r, j, n); e != nil {
  266. return nil, e
  267. }
  268. }
  269. tempSlice = append(tempSlice, r)
  270. }
  271. return tempSlice, nil
  272. default:
  273. return nil, fmt.Errorf("unsupported type %T", st)
  274. }
  275. } else { //basic type
  276. switch ft.Type {
  277. case ast.UNKNOWN:
  278. return nil, fmt.Errorf("invalid data type unknown defined for %s, please checke the stream definition", srcSlice)
  279. case ast.BIGINT:
  280. if srcSlice == nil {
  281. return []int(nil), nil
  282. }
  283. tempSlice := make([]int, 0)
  284. for i, t := range srcSlice {
  285. jtype := reflect.ValueOf(t).Kind()
  286. if jtype == reflect.Float64 {
  287. tempSlice = append(tempSlice, int(t.(float64)))
  288. } else if jtype == reflect.String {
  289. if v, err := strconv.Atoi(t.(string)); err != nil {
  290. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  291. } else {
  292. tempSlice = append(tempSlice, v)
  293. }
  294. } else {
  295. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  296. }
  297. }
  298. return tempSlice, nil
  299. case ast.FLOAT:
  300. if srcSlice == nil {
  301. return []float64(nil), nil
  302. }
  303. tempSlice := make([]float64, 0)
  304. for i, t := range srcSlice {
  305. jtype := reflect.ValueOf(t).Kind()
  306. if jtype == reflect.Float64 {
  307. tempSlice = append(tempSlice, t.(float64))
  308. } else if jtype == reflect.String {
  309. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  310. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  311. } else {
  312. tempSlice = append(tempSlice, f)
  313. }
  314. } else {
  315. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  316. }
  317. }
  318. return tempSlice, nil
  319. case ast.STRINGS:
  320. if srcSlice == nil {
  321. return []string(nil), nil
  322. }
  323. tempSlice := make([]string, 0)
  324. for i, t := range srcSlice {
  325. if reflect.ValueOf(t).Kind() == reflect.String {
  326. tempSlice = append(tempSlice, t.(string))
  327. } else {
  328. return nil, fmt.Errorf("invalid data type for [%d], expect string but found %[2]T(%[2]v)", i, t)
  329. }
  330. }
  331. return tempSlice, nil
  332. case ast.DATETIME:
  333. if srcSlice == nil {
  334. return []time.Time(nil), nil
  335. }
  336. tempSlice := make([]time.Time, 0)
  337. for i, t := range srcSlice {
  338. jtype := reflect.ValueOf(t).Kind()
  339. switch jtype {
  340. case reflect.Int:
  341. ai := t.(int64)
  342. tempSlice = append(tempSlice, cast.TimeFromUnixMilli(ai))
  343. case reflect.Float64:
  344. ai := int64(t.(float64))
  345. tempSlice = append(tempSlice, cast.TimeFromUnixMilli(ai))
  346. case reflect.String:
  347. if ai, err := p.parseTime(t.(string)); err != nil {
  348. return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %[2]T(%[2]v)", t, err)
  349. } else {
  350. tempSlice = append(tempSlice, ai)
  351. }
  352. default:
  353. return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %[2]T(%[2]v)", i, t)
  354. }
  355. }
  356. return tempSlice, nil
  357. case ast.BOOLEAN:
  358. if srcSlice == nil {
  359. return []bool(nil), nil
  360. }
  361. tempSlice := make([]bool, 0)
  362. for i, t := range srcSlice {
  363. jtype := reflect.ValueOf(t).Kind()
  364. if jtype == reflect.Bool {
  365. tempSlice = append(tempSlice, t.(bool))
  366. } else if jtype == reflect.String {
  367. if v, err := strconv.ParseBool(t.(string)); err != nil {
  368. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  369. } else {
  370. tempSlice = append(tempSlice, v)
  371. }
  372. } else {
  373. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  374. }
  375. }
  376. return tempSlice, nil
  377. case ast.BYTEA:
  378. if srcSlice == nil {
  379. return [][]byte(nil), nil
  380. }
  381. tempSlice := make([][]byte, 0)
  382. for i, t := range srcSlice {
  383. jtype := reflect.ValueOf(t).Kind()
  384. if jtype == reflect.String {
  385. if b, err := base64.StdEncoding.DecodeString(t.(string)); err != nil {
  386. return nil, fmt.Errorf("invalid data type for [%d], expect bytea but found %[2]T(%[2]v) which cannot base64 decode", i, t)
  387. } else {
  388. tempSlice = append(tempSlice, b)
  389. }
  390. } else if jtype == reflect.Slice {
  391. if b, ok := t.([]byte); ok {
  392. tempSlice = append(tempSlice, b)
  393. } else {
  394. return nil, fmt.Errorf("invalid data type for [%d], expect bytea but found %[2]T(%[2]v)", i, t)
  395. }
  396. }
  397. }
  398. return tempSlice, nil
  399. default:
  400. return nil, fmt.Errorf("invalid data type for %T", ft.Type)
  401. }
  402. }
  403. }
  404. func (p *defaultFieldProcessor) parseTime(s string) (time.Time, error) {
  405. if p.timestampFormat != "" {
  406. return cast.ParseTime(s, p.timestampFormat)
  407. } else {
  408. return time.Parse(cast.JSISO, s)
  409. }
  410. }