preprocessor.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. package operators
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "math"
  10. "reflect"
  11. "strconv"
  12. "strings"
  13. "time"
  14. )
  15. type Preprocessor struct {
  16. //Pruned stream fields. Could be streamField(with data type info) or string
  17. streamFields []interface{}
  18. aliasFields xsql.Fields
  19. allMeta bool
  20. metaFields []string //only needed if not allMeta
  21. isEventTime bool
  22. timestampField string
  23. timestampFormat string
  24. isBinary bool
  25. }
  26. func NewPreprocessor(fields []interface{}, fs xsql.Fields, allMeta bool, metaFields []string, iet bool, timestampField string, timestampFormat string, isBinary bool) (*Preprocessor, error) {
  27. p := &Preprocessor{streamFields: fields, aliasFields: fs, allMeta: allMeta, metaFields: metaFields, isEventTime: iet, isBinary: isBinary, timestampFormat: timestampFormat, timestampField: timestampField}
  28. return p, nil
  29. }
  30. /*
  31. * input: *xsql.Tuple
  32. * output: *xsql.Tuple
  33. */
  34. func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  35. log := ctx.GetLogger()
  36. tuple, ok := data.(*xsql.Tuple)
  37. if !ok {
  38. return fmt.Errorf("expect tuple data type")
  39. }
  40. log.Debugf("preprocessor receive %s", tuple.Message)
  41. result := make(map[string]interface{})
  42. if p.streamFields != nil {
  43. for _, f := range p.streamFields {
  44. switch sf := f.(type) {
  45. case *xsql.StreamField:
  46. if p.isBinary {
  47. tuple.Message[sf.Name] = tuple.Message[common.DEFAULT_FIELD]
  48. }
  49. if e := p.addRecField(sf.FieldType, result, tuple.Message, sf.Name); e != nil {
  50. return fmt.Errorf("error in preprocessor: %s", e)
  51. }
  52. case string: //schemaless
  53. if p.isBinary {
  54. result = tuple.Message
  55. } else {
  56. if m, ok := tuple.Message.Value(sf); ok {
  57. result[sf] = m
  58. }
  59. }
  60. }
  61. if p.isBinary {
  62. break //binary format should only have ONE field
  63. }
  64. }
  65. } else {
  66. result = tuple.Message
  67. }
  68. //If the field has alias name, then evaluate the alias field before transfer it to proceeding operators, and put it into result.
  69. //Otherwise, the GROUP BY, ORDER BY statement cannot get the value.
  70. for _, f := range p.aliasFields {
  71. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, fv)}
  72. v := ve.Eval(f.Expr)
  73. if _, ok := v.(error); ok {
  74. return v
  75. } else {
  76. result[f.AName] = v
  77. }
  78. }
  79. tuple.Message = result
  80. if p.isEventTime {
  81. if t, ok := result[p.timestampField]; ok {
  82. if ts, err := common.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
  83. return fmt.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
  84. } else {
  85. tuple.Timestamp = ts
  86. log.Debugf("preprocessor calculate timstamp %d", tuple.Timestamp)
  87. }
  88. } else {
  89. return fmt.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
  90. }
  91. }
  92. if !p.allMeta && p.metaFields != nil && len(p.metaFields) > 0 {
  93. newMeta := make(xsql.Metadata)
  94. for _, f := range p.metaFields {
  95. newMeta[f] = tuple.Metadata[f]
  96. }
  97. tuple.Metadata = newMeta
  98. }
  99. return tuple
  100. }
  101. func (p *Preprocessor) parseTime(s string) (time.Time, error) {
  102. if p.timestampFormat != "" {
  103. return common.ParseTime(s, p.timestampFormat)
  104. } else {
  105. return time.Parse(common.JSISO, s)
  106. }
  107. }
  108. func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{}, j xsql.Message, n string) error {
  109. if t, ok := j.Value(n); ok {
  110. v := reflect.ValueOf(t)
  111. jtype := v.Kind()
  112. switch st := ft.(type) {
  113. case *xsql.BasicType:
  114. switch st.Type {
  115. case xsql.UNKNOWN:
  116. return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
  117. case xsql.BIGINT:
  118. if jtype == reflect.Int {
  119. r[n] = t.(int)
  120. } else if jtype == reflect.Float64 {
  121. if tt, ok1 := t.(float64); ok1 {
  122. if tt > math.MaxInt64 {
  123. r[n] = uint64(tt)
  124. } else {
  125. r[n] = int(tt)
  126. }
  127. }
  128. } else if jtype == reflect.String {
  129. if i, err := strconv.Atoi(t.(string)); err != nil {
  130. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  131. } else {
  132. r[n] = i
  133. }
  134. } else if jtype == reflect.Uint64 {
  135. r[n] = t.(uint64)
  136. } else {
  137. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  138. }
  139. case xsql.FLOAT:
  140. if jtype == reflect.Float64 {
  141. r[n] = t.(float64)
  142. } else if jtype == reflect.String {
  143. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  144. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  145. } else {
  146. r[n] = f
  147. }
  148. } else {
  149. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  150. }
  151. case xsql.STRINGS:
  152. if jtype == reflect.String {
  153. r[n] = t.(string)
  154. } else {
  155. return fmt.Errorf("invalid data type for %s, expect string but found %[2]T(%[2]v)", n, t)
  156. }
  157. case xsql.DATETIME:
  158. switch jtype {
  159. case reflect.Int:
  160. ai := t.(int64)
  161. r[n] = common.TimeFromUnixMilli(ai)
  162. case reflect.Float64:
  163. ai := int64(t.(float64))
  164. r[n] = common.TimeFromUnixMilli(ai)
  165. case reflect.String:
  166. if t, err := p.parseTime(t.(string)); err != nil {
  167. return fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", n, err)
  168. } else {
  169. r[n] = t
  170. }
  171. default:
  172. return fmt.Errorf("invalid data type for %s, expect datatime but find %[2]T(%[2]v)", n, t)
  173. }
  174. case xsql.BOOLEAN:
  175. if jtype == reflect.Bool {
  176. r[n] = t.(bool)
  177. } else if jtype == reflect.String {
  178. if i, err := strconv.ParseBool(t.(string)); err != nil {
  179. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  180. } else {
  181. r[n] = i
  182. }
  183. } else {
  184. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  185. }
  186. case xsql.BYTEA:
  187. if jtype == reflect.String {
  188. if b, err := base64.StdEncoding.DecodeString(t.(string)); err != nil {
  189. return fmt.Errorf("invalid data type for %s, expect bytea but found %[2]T(%[2]v) which cannot base64 decode", n, t)
  190. } else {
  191. r[n] = b
  192. }
  193. } else if jtype == reflect.Slice {
  194. if b, ok := t.([]byte); ok {
  195. r[n] = b
  196. } else {
  197. return fmt.Errorf("invalid data type for %s, expect bytea but found %[2]T(%[2]v)", n, t)
  198. }
  199. }
  200. default:
  201. return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
  202. }
  203. case *xsql.ArrayType:
  204. var s []interface{}
  205. if t == nil {
  206. s = nil
  207. } else if jtype == reflect.Slice {
  208. s = t.([]interface{})
  209. } else if jtype == reflect.String {
  210. err := json.Unmarshal([]byte(t.(string)), &s)
  211. if err != nil {
  212. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  213. }
  214. } else {
  215. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  216. }
  217. if tempArr, err := p.addArrayField(st, s); err != nil {
  218. return fmt.Errorf("fail to parse field %s: %s", n, err)
  219. } else {
  220. r[n] = tempArr
  221. }
  222. case *xsql.RecType:
  223. nextJ := make(map[string]interface{})
  224. if t == nil {
  225. nextJ = nil
  226. r[n] = nextJ
  227. return nil
  228. } else if jtype == reflect.Map {
  229. nextJ, ok = t.(map[string]interface{})
  230. if !ok {
  231. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  232. }
  233. } else if jtype == reflect.String {
  234. err := json.Unmarshal([]byte(t.(string)), &nextJ)
  235. if err != nil {
  236. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  237. }
  238. } else {
  239. return fmt.Errorf("invalid data type for %s, expect struct but found %[2]T(%[2]v)", n, t)
  240. }
  241. nextR := make(map[string]interface{})
  242. for _, nextF := range st.StreamFields {
  243. nextP := strings.ToLower(nextF.Name)
  244. if e := p.addRecField(nextF.FieldType, nextR, nextJ, nextP); e != nil {
  245. return e
  246. }
  247. }
  248. r[n] = nextR
  249. default:
  250. return fmt.Errorf("unsupported type %T", st)
  251. }
  252. return nil
  253. } else {
  254. return fmt.Errorf("invalid data %s, field %s not found", j, n)
  255. }
  256. }
  257. //ft must be xsql.ArrayType
  258. //side effect: r[p] will be set to the new array
  259. func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{}) (interface{}, error) {
  260. if ft.FieldType != nil { //complex type array or struct
  261. switch st := ft.FieldType.(type) { //Only two complex types supported here
  262. case *xsql.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
  263. if srcSlice == nil {
  264. return [][]interface{}(nil), nil
  265. }
  266. var s []interface{}
  267. var tempSlice reflect.Value
  268. for i, t := range srcSlice {
  269. jtype := reflect.ValueOf(t).Kind()
  270. if t == nil {
  271. s = nil
  272. } else if jtype == reflect.Slice || jtype == reflect.Array {
  273. s = t.([]interface{})
  274. } else if jtype == reflect.String {
  275. err := json.Unmarshal([]byte(t.(string)), &s)
  276. if err != nil {
  277. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  278. }
  279. } else {
  280. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  281. }
  282. if tempArr, err := p.addArrayField(st, s); err != nil {
  283. return nil, err
  284. } else {
  285. if !tempSlice.IsValid() {
  286. s := reflect.TypeOf(tempArr)
  287. tempSlice = reflect.MakeSlice(reflect.SliceOf(s), 0, 0)
  288. }
  289. tempSlice = reflect.Append(tempSlice, reflect.ValueOf(tempArr))
  290. }
  291. }
  292. return tempSlice.Interface(), nil
  293. case *xsql.RecType:
  294. if srcSlice == nil {
  295. return []map[string]interface{}(nil), nil
  296. }
  297. tempSlice := make([]map[string]interface{}, 0)
  298. for i, t := range srcSlice {
  299. jtype := reflect.ValueOf(t).Kind()
  300. j := make(map[string]interface{})
  301. var ok bool
  302. if t == nil {
  303. j = nil
  304. tempSlice = append(tempSlice, j)
  305. continue
  306. } else if jtype == reflect.Map {
  307. j, ok = t.(map[string]interface{})
  308. if !ok {
  309. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  310. }
  311. } else if jtype == reflect.String {
  312. err := json.Unmarshal([]byte(t.(string)), &j)
  313. if err != nil {
  314. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  315. }
  316. } else {
  317. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  318. }
  319. r := make(map[string]interface{})
  320. for _, f := range st.StreamFields {
  321. n := f.Name
  322. if e := p.addRecField(f.FieldType, r, j, n); e != nil {
  323. return nil, e
  324. }
  325. }
  326. tempSlice = append(tempSlice, r)
  327. }
  328. return tempSlice, nil
  329. default:
  330. return nil, fmt.Errorf("unsupported type %T", st)
  331. }
  332. } else { //basic type
  333. switch ft.Type {
  334. case xsql.UNKNOWN:
  335. return nil, fmt.Errorf("invalid data type unknown defined for %s, please checke the stream definition", srcSlice)
  336. case xsql.BIGINT:
  337. if srcSlice == nil {
  338. return []int(nil), nil
  339. }
  340. tempSlice := make([]int, 0)
  341. for i, t := range srcSlice {
  342. jtype := reflect.ValueOf(t).Kind()
  343. if jtype == reflect.Float64 {
  344. tempSlice = append(tempSlice, int(t.(float64)))
  345. } else if jtype == reflect.String {
  346. if v, err := strconv.Atoi(t.(string)); err != nil {
  347. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  348. } else {
  349. tempSlice = append(tempSlice, v)
  350. }
  351. } else {
  352. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  353. }
  354. }
  355. return tempSlice, nil
  356. case xsql.FLOAT:
  357. if srcSlice == nil {
  358. return []float64(nil), nil
  359. }
  360. tempSlice := make([]float64, 0)
  361. for i, t := range srcSlice {
  362. jtype := reflect.ValueOf(t).Kind()
  363. if jtype == reflect.Float64 {
  364. tempSlice = append(tempSlice, t.(float64))
  365. } else if jtype == reflect.String {
  366. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  367. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  368. } else {
  369. tempSlice = append(tempSlice, f)
  370. }
  371. } else {
  372. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  373. }
  374. }
  375. return tempSlice, nil
  376. case xsql.STRINGS:
  377. if srcSlice == nil {
  378. return []string(nil), nil
  379. }
  380. tempSlice := make([]string, 0)
  381. for i, t := range srcSlice {
  382. if reflect.ValueOf(t).Kind() == reflect.String {
  383. tempSlice = append(tempSlice, t.(string))
  384. } else {
  385. return nil, fmt.Errorf("invalid data type for [%d], expect string but found %[2]T(%[2]v)", i, t)
  386. }
  387. }
  388. return tempSlice, nil
  389. case xsql.DATETIME:
  390. if srcSlice == nil {
  391. return []time.Time(nil), nil
  392. }
  393. tempSlice := make([]time.Time, 0)
  394. for i, t := range srcSlice {
  395. jtype := reflect.ValueOf(t).Kind()
  396. switch jtype {
  397. case reflect.Int:
  398. ai := t.(int64)
  399. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  400. case reflect.Float64:
  401. ai := int64(t.(float64))
  402. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  403. case reflect.String:
  404. if ai, err := p.parseTime(t.(string)); err != nil {
  405. return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %[2]T(%[2]v)", t, err)
  406. } else {
  407. tempSlice = append(tempSlice, ai)
  408. }
  409. default:
  410. return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %[2]T(%[2]v)", i, t)
  411. }
  412. }
  413. return tempSlice, nil
  414. case xsql.BOOLEAN:
  415. if srcSlice == nil {
  416. return []bool(nil), nil
  417. }
  418. tempSlice := make([]bool, 0)
  419. for i, t := range srcSlice {
  420. jtype := reflect.ValueOf(t).Kind()
  421. if jtype == reflect.Bool {
  422. tempSlice = append(tempSlice, t.(bool))
  423. } else if jtype == reflect.String {
  424. if v, err := strconv.ParseBool(t.(string)); err != nil {
  425. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  426. } else {
  427. tempSlice = append(tempSlice, v)
  428. }
  429. } else {
  430. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  431. }
  432. }
  433. return tempSlice, nil
  434. case xsql.BYTEA:
  435. if srcSlice == nil {
  436. return [][]byte(nil), nil
  437. }
  438. tempSlice := make([][]byte, 0)
  439. for i, t := range srcSlice {
  440. jtype := reflect.ValueOf(t).Kind()
  441. if jtype == reflect.String {
  442. if b, err := base64.StdEncoding.DecodeString(t.(string)); err != nil {
  443. return nil, fmt.Errorf("invalid data type for [%d], expect bytea but found %[2]T(%[2]v) which cannot base64 decode", i, t)
  444. } else {
  445. tempSlice = append(tempSlice, b)
  446. }
  447. } else if jtype == reflect.Slice {
  448. if b, ok := t.([]byte); ok {
  449. tempSlice = append(tempSlice, b)
  450. } else {
  451. return nil, fmt.Errorf("invalid data type for [%d], expect bytea but found %[2]T(%[2]v)", i, t)
  452. }
  453. }
  454. }
  455. return tempSlice, nil
  456. default:
  457. return nil, fmt.Errorf("invalid data type for %T", ft.Type)
  458. }
  459. }
  460. }