preprocessor.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. package plans
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/xsql"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "fmt"
  7. "reflect"
  8. "strings"
  9. "time"
  10. )
  11. type Preprocessor struct {
  12. streamStmt *xsql.StreamStmt
  13. fields xsql.Fields
  14. isEventTime bool
  15. timestampField string
  16. timestampFormat string
  17. }
  18. func NewPreprocessor(s *xsql.StreamStmt, fs xsql.Fields, iet bool) (*Preprocessor, error){
  19. p := &Preprocessor{streamStmt: s, fields: fs, isEventTime: iet}
  20. if iet {
  21. if tf, ok := s.Options["TIMESTAMP"]; ok{
  22. p.timestampField = tf
  23. }else{
  24. return nil, fmt.Errorf("preprocessor is set to be event time but stream option TIMESTAMP not found")
  25. }
  26. if ts, ok := s.Options["TIMESTAMP_FORMAT"]; ok{
  27. p.timestampFormat = ts
  28. }
  29. }
  30. return p, nil
  31. }
  32. /*
  33. * input: *xsql.Tuple
  34. * output: *xsql.Tuple
  35. */
  36. func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}) interface{} {
  37. log := ctx.GetLogger()
  38. tuple, ok := data.(*xsql.Tuple)
  39. if !ok {
  40. log.Errorf("Expect tuple data type")
  41. return nil
  42. }
  43. log.Debugf("preprocessor receive %s", tuple.Message)
  44. result := make(map[string]interface{})
  45. for _, f := range p.streamStmt.StreamFields {
  46. fname := strings.ToLower(f.Name)
  47. if e := p.addRecField(f.FieldType, result, tuple.Message, fname); e != nil{
  48. log.Errorf("error in preprocessor: %s", e)
  49. return nil
  50. }
  51. }
  52. //If the field has alias name, then evaluate the alias field before transfer it to proceeding operators, and put it into result.
  53. //Otherwise, the GROUP BY, ORDER BY statement cannot get the value.
  54. for _, f := range p.fields {
  55. if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)) {
  56. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, &xsql.FunctionValuer{})}
  57. if v := ve.Eval(f.Expr); v != nil {
  58. result[strings.ToLower(f.AName)] = v
  59. }
  60. }
  61. }
  62. tuple.Message = result
  63. if p.isEventTime{
  64. if t, ok := result[p.timestampField]; ok{
  65. if ts, err := common.InterfaceToUnixMilli(t, p.timestampFormat); err != nil{
  66. log.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
  67. return nil
  68. }else{
  69. tuple.Timestamp = ts
  70. log.Debugf("preprocessor calculate timstamp %d", tuple.Timestamp)
  71. }
  72. }else{
  73. log.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
  74. return nil
  75. }
  76. }
  77. return tuple
  78. }
  79. func (p *Preprocessor) parseTime(s string) (time.Time, error){
  80. if f, ok := p.streamStmt.Options["TIMESTAMP_FORMAT"]; ok{
  81. return common.ParseTime(s, f)
  82. }else{
  83. return time.Parse(common.JSISO, s)
  84. }
  85. }
  86. func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{}, j map[string]interface{}, n string) error {
  87. if t, ok := j[n]; ok {
  88. v := reflect.ValueOf(t)
  89. jtype := v.Kind()
  90. switch st := ft.(type) {
  91. case *xsql.BasicType:
  92. switch st.Type {
  93. case xsql.UNKNOWN:
  94. return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
  95. case xsql.BIGINT:
  96. if jtype == reflect.Int{
  97. r[n] = t.(int)
  98. }else if jtype == reflect.Float64{
  99. r[n] = int(t.(float64))
  100. }else{
  101. return fmt.Errorf("invalid data type for %s, expect bigint but found %s", n, t)
  102. }
  103. case xsql.FLOAT:
  104. if jtype == reflect.Float64{
  105. r[n] = t.(float64)
  106. }else{
  107. return fmt.Errorf("invalid data type for %s, expect float but found %s", n, t)
  108. }
  109. case xsql.STRINGS:
  110. if jtype == reflect.String{
  111. r[n] = t.(string)
  112. }else{
  113. return fmt.Errorf("invalid data type for %s, expect string but found %s", n, t)
  114. }
  115. case xsql.DATETIME:
  116. switch jtype {
  117. case reflect.Int:
  118. ai := t.(int64)
  119. r[n] = common.TimeFromUnixMilli(ai)
  120. case reflect.Float64:
  121. ai := int64(t.(float64))
  122. r[n] = common.TimeFromUnixMilli(ai)
  123. case reflect.String:
  124. if t, err := p.parseTime(t.(string)); err != nil{
  125. return fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", n, err)
  126. }else{
  127. r[n] = t
  128. }
  129. default:
  130. return fmt.Errorf("invalid data type for %s, expect datatime but find %v", n, t)
  131. }
  132. case xsql.BOOLEAN:
  133. if jtype == reflect.Bool{
  134. r[n] = t.(bool)
  135. }else{
  136. return fmt.Errorf("invalid data type for %s, expect boolean but found %s", n, t)
  137. }
  138. default:
  139. return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
  140. }
  141. case *xsql.ArrayType:
  142. if jtype != reflect.Slice{
  143. return fmt.Errorf("invalid data type for %s, expect array but found %s", n, t)
  144. }
  145. if tempArr, err := p.addArrayField(st, t.([]interface{})); err !=nil{
  146. return err
  147. }else {
  148. r[n] = tempArr
  149. }
  150. case *xsql.RecType:
  151. if jtype != reflect.Map{
  152. return fmt.Errorf("invalid data type for %s, expect struct but found %s", n, t)
  153. }
  154. nextJ, ok := j[n].(map[string]interface{})
  155. if !ok {
  156. return fmt.Errorf("invalid data type for %s, expect map but found %s", n, t)
  157. }
  158. nextR := make(map[string]interface{})
  159. for _, nextF := range st.StreamFields {
  160. nextP := strings.ToLower(nextF.Name)
  161. if e := p.addRecField(nextF.FieldType, nextR, nextJ, nextP); e != nil{
  162. return e
  163. }
  164. }
  165. r[n] = nextR
  166. default:
  167. return fmt.Errorf("unsupported type %T", st)
  168. }
  169. return nil
  170. }else{
  171. return fmt.Errorf("invalid data %s, field %s not found", j, n)
  172. }
  173. }
  174. //ft must be xsql.ArrayType
  175. //side effect: r[p] will be set to the new array
  176. func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{}) (interface{}, error) {
  177. if ft.FieldType != nil { //complex type array or struct
  178. switch st := ft.FieldType.(type) { //Only two complex types supported here
  179. case *xsql.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
  180. var tempSlice [][]interface{}
  181. for i, t := range srcSlice{
  182. if reflect.ValueOf(t).Kind() == reflect.Array{
  183. if tempArr, err := p.addArrayField(st, t.([]interface{})); err !=nil{
  184. return nil, err
  185. }else {
  186. tempSlice = append(tempSlice, tempArr.([]interface{}))
  187. }
  188. }else{
  189. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %s", i, t)
  190. }
  191. }
  192. return tempSlice, nil
  193. case *xsql.RecType:
  194. var tempSlice []map[string]interface{}
  195. for i, t := range srcSlice{
  196. if reflect.ValueOf(t).Kind() == reflect.Map{
  197. j, ok := t.(map[string]interface{})
  198. if !ok {
  199. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %s", i, t)
  200. }
  201. r := make(map[string]interface{})
  202. for _, f := range st.StreamFields {
  203. n := f.Name
  204. if e := p.addRecField(f.FieldType, r, j, n); e != nil{
  205. return nil, e
  206. }
  207. }
  208. tempSlice = append(tempSlice, r)
  209. }else{
  210. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
  211. }
  212. }
  213. return tempSlice, nil
  214. default:
  215. return nil, fmt.Errorf("unsupported type %T", st)
  216. }
  217. }else{ //basic type
  218. switch ft.Type {
  219. case xsql.UNKNOWN:
  220. return nil, fmt.Errorf("invalid data type unknown defined for %s, please checke the stream definition", srcSlice)
  221. case xsql.BIGINT:
  222. var tempSlice []int
  223. for i, t := range srcSlice {
  224. if reflect.ValueOf(t).Kind() == reflect.Float64{
  225. tempSlice = append(tempSlice, int(t.(float64)))
  226. }else{
  227. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
  228. }
  229. }
  230. return tempSlice, nil
  231. case xsql.FLOAT:
  232. var tempSlice []float64
  233. for i, t := range srcSlice {
  234. if reflect.ValueOf(t).Kind() == reflect.Float64{
  235. tempSlice = append(tempSlice, t.(float64))
  236. }else{
  237. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
  238. }
  239. }
  240. return tempSlice, nil
  241. case xsql.STRINGS:
  242. var tempSlice []string
  243. for i, t := range srcSlice {
  244. if reflect.ValueOf(t).Kind() == reflect.String{
  245. tempSlice = append(tempSlice, t.(string))
  246. }else{
  247. return nil, fmt.Errorf("invalid data type for [%d], expect string but found %s", i, t)
  248. }
  249. }
  250. return tempSlice, nil
  251. case xsql.DATETIME:
  252. var tempSlice []time.Time
  253. for i, t := range srcSlice {
  254. jtype := reflect.ValueOf(t).Kind()
  255. switch jtype {
  256. case reflect.Int:
  257. ai := t.(int64)
  258. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  259. case reflect.Float64:
  260. ai := int64(t.(float64))
  261. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  262. case reflect.String:
  263. if ai, err := p.parseTime(t.(string)); err != nil{
  264. return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", t, err)
  265. }else{
  266. tempSlice = append(tempSlice, ai)
  267. }
  268. default:
  269. return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %v", i, t)
  270. }
  271. }
  272. return tempSlice, nil
  273. case xsql.BOOLEAN:
  274. var tempSlice []bool
  275. for i, t := range srcSlice {
  276. if reflect.ValueOf(t).Kind() == reflect.Bool{
  277. tempSlice = append(tempSlice, t.(bool))
  278. }else{
  279. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %s", i, t)
  280. }
  281. }
  282. return tempSlice, nil
  283. default:
  284. return nil, fmt.Errorf("invalid data type for %T, datetime type is not supported yet", ft.Type)
  285. }
  286. }
  287. }