123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413 |
- package plans
- import (
- "encoding/json"
- "fmt"
- "github.com/emqx/kuiper/common"
- "github.com/emqx/kuiper/xsql"
- "github.com/emqx/kuiper/xstream/api"
- "reflect"
- "strconv"
- "strings"
- "time"
- )
- type Preprocessor struct {
- streamStmt *xsql.StreamStmt
- fields xsql.Fields
- isEventTime bool
- timestampField string
- timestampFormat string
- }
- func NewPreprocessor(s *xsql.StreamStmt, fs xsql.Fields, iet bool) (*Preprocessor, error) {
- p := &Preprocessor{streamStmt: s, fields: fs, isEventTime: iet}
- if iet {
- if tf, ok := s.Options["TIMESTAMP"]; ok {
- p.timestampField = tf
- } else {
- return nil, fmt.Errorf("preprocessor is set to be event time but stream option TIMESTAMP not found")
- }
- if ts, ok := s.Options["TIMESTAMP_FORMAT"]; ok {
- p.timestampFormat = ts
- }
- }
- return p, nil
- }
- /*
- * input: *xsql.Tuple
- * output: *xsql.Tuple
- */
- func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}) interface{} {
- log := ctx.GetLogger()
- tuple, ok := data.(*xsql.Tuple)
- if !ok {
- return fmt.Errorf("expect tuple data type")
- }
- log.Debugf("preprocessor receive %s", tuple.Message)
- result := make(map[string]interface{})
- if p.streamStmt.StreamFields != nil {
- for _, f := range p.streamStmt.StreamFields {
- fname := strings.ToLower(f.Name)
- if e := p.addRecField(f.FieldType, result, tuple.Message, fname); e != nil {
- return fmt.Errorf("error in preprocessor: %s", e)
- }
- }
- } else {
- result = tuple.Message
- }
- //If the field has alias name, then evaluate the alias field before transfer it to proceeding operators, and put it into result.
- //Otherwise, the GROUP BY, ORDER BY statement cannot get the value.
- for _, f := range p.fields {
- if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)) {
- ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, &xsql.FunctionValuer{})}
- v := ve.Eval(f.Expr)
- if _, ok := v.(error); ok {
- return v
- } else {
- result[strings.ToLower(f.AName)] = v
- }
- }
- }
- tuple.Message = result
- if p.isEventTime {
- if t, ok := result[p.timestampField]; ok {
- if ts, err := common.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
- return fmt.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
- } else {
- tuple.Timestamp = ts
- log.Debugf("preprocessor calculate timstamp %d", tuple.Timestamp)
- }
- } else {
- return fmt.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
- }
- }
- return tuple
- }
- func (p *Preprocessor) parseTime(s string) (time.Time, error) {
- if f, ok := p.streamStmt.Options["TIMESTAMP_FORMAT"]; ok {
- return common.ParseTime(s, f)
- } else {
- return time.Parse(common.JSISO, s)
- }
- }
- func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{}, j map[string]interface{}, n string) error {
- if t, ok := j[n]; ok {
- v := reflect.ValueOf(t)
- jtype := v.Kind()
- switch st := ft.(type) {
- case *xsql.BasicType:
- switch st.Type {
- case xsql.UNKNOWN:
- return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
- case xsql.BIGINT:
- if jtype == reflect.Int {
- r[n] = t.(int)
- } else if jtype == reflect.Float64 {
- r[n] = int(t.(float64))
- } else if jtype == reflect.String {
- if i, err := strconv.Atoi(t.(string)); err != nil {
- return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
- } else {
- r[n] = i
- }
- } else {
- return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
- }
- case xsql.FLOAT:
- if jtype == reflect.Float64 {
- r[n] = t.(float64)
- } else if jtype == reflect.String {
- if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
- return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
- } else {
- r[n] = f
- }
- } else {
- return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
- }
- case xsql.STRINGS:
- if jtype == reflect.String {
- r[n] = t.(string)
- } else {
- return fmt.Errorf("invalid data type for %s, expect string but found %[2]T(%[2]v)", n, t)
- }
- case xsql.DATETIME:
- switch jtype {
- case reflect.Int:
- ai := t.(int64)
- r[n] = common.TimeFromUnixMilli(ai)
- case reflect.Float64:
- ai := int64(t.(float64))
- r[n] = common.TimeFromUnixMilli(ai)
- case reflect.String:
- if t, err := p.parseTime(t.(string)); err != nil {
- return fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", n, err)
- } else {
- r[n] = t
- }
- default:
- return fmt.Errorf("invalid data type for %s, expect datatime but find %[2]T(%[2]v)", n, t)
- }
- case xsql.BOOLEAN:
- if jtype == reflect.Bool {
- r[n] = t.(bool)
- } else if jtype == reflect.String {
- if i, err := strconv.ParseBool(t.(string)); err != nil {
- return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
- } else {
- r[n] = i
- }
- } else {
- return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
- }
- default:
- return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
- }
- case *xsql.ArrayType:
- var s []interface{}
- if t == nil {
- s = nil
- } else if jtype == reflect.Slice {
- s = t.([]interface{})
- } else if jtype == reflect.String {
- err := json.Unmarshal([]byte(t.(string)), &s)
- if err != nil {
- return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
- }
- } else {
- return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
- }
- if tempArr, err := p.addArrayField(st, s); err != nil {
- return fmt.Errorf("fail to parse field %s: %s", n, err)
- } else {
- r[n] = tempArr
- }
- case *xsql.RecType:
- nextJ := make(map[string]interface{})
- if t == nil {
- nextJ = nil
- r[n] = nextJ
- return nil
- } else if jtype == reflect.Map {
- nextJ, ok = t.(map[string]interface{})
- if !ok {
- return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
- }
- } else if jtype == reflect.String {
- err := json.Unmarshal([]byte(t.(string)), &nextJ)
- if err != nil {
- return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
- }
- } else {
- return fmt.Errorf("invalid data type for %s, expect struct but found %[2]T(%[2]v)", n, t)
- }
- nextR := make(map[string]interface{})
- for _, nextF := range st.StreamFields {
- nextP := strings.ToLower(nextF.Name)
- if e := p.addRecField(nextF.FieldType, nextR, nextJ, nextP); e != nil {
- return e
- }
- }
- r[n] = nextR
- default:
- return fmt.Errorf("unsupported type %T", st)
- }
- return nil
- } else {
- return fmt.Errorf("invalid data %s, field %s not found", j, n)
- }
- }
- //ft must be xsql.ArrayType
- //side effect: r[p] will be set to the new array
- func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{}) (interface{}, error) {
- if ft.FieldType != nil { //complex type array or struct
- switch st := ft.FieldType.(type) { //Only two complex types supported here
- case *xsql.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
- if srcSlice == nil {
- return [][]interface{}(nil), nil
- }
- var s []interface{}
- var tempSlice reflect.Value
- for i, t := range srcSlice {
- jtype := reflect.ValueOf(t).Kind()
- if t == nil {
- s = nil
- } else if jtype == reflect.Slice || jtype == reflect.Array {
- s = t.([]interface{})
- } else if jtype == reflect.String {
- err := json.Unmarshal([]byte(t.(string)), &s)
- if err != nil {
- return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
- }
- } else {
- return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
- }
- if tempArr, err := p.addArrayField(st, s); err != nil {
- return nil, err
- } else {
- if !tempSlice.IsValid() {
- s := reflect.TypeOf(tempArr)
- tempSlice = reflect.MakeSlice(reflect.SliceOf(s), 0, 0)
- }
- tempSlice = reflect.Append(tempSlice, reflect.ValueOf(tempArr))
- }
- }
- return tempSlice.Interface(), nil
- case *xsql.RecType:
- if srcSlice == nil {
- return []map[string]interface{}(nil), nil
- }
- tempSlice := make([]map[string]interface{}, 0)
- for i, t := range srcSlice {
- jtype := reflect.ValueOf(t).Kind()
- j := make(map[string]interface{})
- var ok bool
- if t == nil {
- j = nil
- tempSlice = append(tempSlice, j)
- continue
- } else if jtype == reflect.Map {
- j, ok = t.(map[string]interface{})
- if !ok {
- return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
- }
- } else if jtype == reflect.String {
- err := json.Unmarshal([]byte(t.(string)), &j)
- if err != nil {
- return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
- }
- } else {
- return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
- }
- r := make(map[string]interface{})
- for _, f := range st.StreamFields {
- n := f.Name
- if e := p.addRecField(f.FieldType, r, j, n); e != nil {
- return nil, e
- }
- }
- tempSlice = append(tempSlice, r)
- }
- return tempSlice, nil
- default:
- return nil, fmt.Errorf("unsupported type %T", st)
- }
- } else { //basic type
- switch ft.Type {
- case xsql.UNKNOWN:
- return nil, fmt.Errorf("invalid data type unknown defined for %s, please checke the stream definition", srcSlice)
- case xsql.BIGINT:
- if srcSlice == nil {
- return []int(nil), nil
- }
- tempSlice := make([]int, 0)
- for i, t := range srcSlice {
- jtype := reflect.ValueOf(t).Kind()
- if jtype == reflect.Float64 {
- tempSlice = append(tempSlice, int(t.(float64)))
- } else if jtype == reflect.String {
- if v, err := strconv.Atoi(t.(string)); err != nil {
- return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
- } else {
- tempSlice = append(tempSlice, v)
- }
- } else {
- return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
- }
- }
- return tempSlice, nil
- case xsql.FLOAT:
- if srcSlice == nil {
- return []float64(nil), nil
- }
- tempSlice := make([]float64, 0)
- for i, t := range srcSlice {
- jtype := reflect.ValueOf(t).Kind()
- if jtype == reflect.Float64 {
- tempSlice = append(tempSlice, t.(float64))
- } else if jtype == reflect.String {
- if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
- return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
- } else {
- tempSlice = append(tempSlice, f)
- }
- } else {
- return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
- }
- }
- return tempSlice, nil
- case xsql.STRINGS:
- if srcSlice == nil {
- return []string(nil), nil
- }
- tempSlice := make([]string, 0)
- for i, t := range srcSlice {
- if reflect.ValueOf(t).Kind() == reflect.String {
- tempSlice = append(tempSlice, t.(string))
- } else {
- return nil, fmt.Errorf("invalid data type for [%d], expect string but found %[2]T(%[2]v)", i, t)
- }
- }
- return tempSlice, nil
- case xsql.DATETIME:
- if srcSlice == nil {
- return []time.Time(nil), nil
- }
- tempSlice := make([]time.Time, 0)
- for i, t := range srcSlice {
- jtype := reflect.ValueOf(t).Kind()
- switch jtype {
- case reflect.Int:
- ai := t.(int64)
- tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
- case reflect.Float64:
- ai := int64(t.(float64))
- tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
- case reflect.String:
- if ai, err := p.parseTime(t.(string)); err != nil {
- return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %[2]T(%[2]v)", t, err)
- } else {
- tempSlice = append(tempSlice, ai)
- }
- default:
- return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %[2]T(%[2]v)", i, t)
- }
- }
- return tempSlice, nil
- case xsql.BOOLEAN:
- if srcSlice == nil {
- return []bool(nil), nil
- }
- tempSlice := make([]bool, 0)
- for i, t := range srcSlice {
- jtype := reflect.ValueOf(t).Kind()
- if jtype == reflect.Bool {
- tempSlice = append(tempSlice, t.(bool))
- } else if jtype == reflect.String {
- if v, err := strconv.ParseBool(t.(string)); err != nil {
- return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
- } else {
- tempSlice = append(tempSlice, v)
- }
- } else {
- return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
- }
- }
- return tempSlice, nil
- default:
- return nil, fmt.Errorf("invalid data type for %T", ft.Type)
- }
- }
- }
|