preprocessor.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. package plans
  2. import (
  3. "context"
  4. "engine/common"
  5. "engine/xsql"
  6. "fmt"
  7. "reflect"
  8. "strings"
  9. "time"
  10. )
  11. type Preprocessor struct {
  12. streamStmt *xsql.StreamStmt
  13. isEventTime bool
  14. timestampField string
  15. timestampFormat string
  16. }
  17. func NewPreprocessor(s *xsql.StreamStmt, iet bool) (*Preprocessor, error){
  18. p := &Preprocessor{streamStmt: s, isEventTime: iet}
  19. if iet {
  20. if tf, ok := s.Options["TIMESTAMP"]; ok{
  21. p.timestampField = tf
  22. }else{
  23. return nil, fmt.Errorf("preprocessor is set to be event time but stream option TIMESTAMP not found")
  24. }
  25. if ts, ok := s.Options["TIMESTAMP_FORMAT"]; ok{
  26. p.timestampFormat = ts
  27. }
  28. }
  29. return p, nil
  30. }
  31. /*
  32. * input: *xsql.Tuple
  33. * output: *xsql.Tuple
  34. */
  35. func (p *Preprocessor) Apply(ctx context.Context, data interface{}) interface{} {
  36. log := common.GetLogger(ctx)
  37. tuple, ok := data.(*xsql.Tuple)
  38. if !ok {
  39. log.Errorf("Expect tuple data type")
  40. return nil
  41. }
  42. log.Debugf("preprocessor receive %s", tuple.Message)
  43. result := make(map[string]interface{})
  44. for _, f := range p.streamStmt.StreamFields {
  45. fname := strings.ToLower(f.Name)
  46. if e := p.addRecField(f.FieldType, result, tuple.Message, fname); e != nil{
  47. log.Errorf("error in preprocessor: %s", e)
  48. return nil
  49. }
  50. }
  51. //Add the value of special keys
  52. for _, v := range xsql.SpecialKeyMapper {
  53. if v1, ok := tuple.Message[v]; ok {
  54. result[v] = v1
  55. }
  56. }
  57. tuple.Message = result
  58. if p.isEventTime{
  59. if t, ok := result[p.timestampField]; ok{
  60. if ts, err := common.InterfaceToUnixMilli(t, p.timestampFormat); err != nil{
  61. log.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
  62. return nil
  63. }else{
  64. tuple.Timestamp = ts
  65. log.Debugf("preprocessor calculate timstamp %d", tuple.Timestamp)
  66. }
  67. }else{
  68. log.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
  69. return nil
  70. }
  71. }
  72. return tuple
  73. }
  74. func (p *Preprocessor) parseTime(s string) (time.Time, error){
  75. if f, ok := p.streamStmt.Options["TIMESTAMP_FORMAT"]; ok{
  76. return common.ParseTime(s, f)
  77. }else{
  78. return time.Parse(common.JSISO, s)
  79. }
  80. }
  81. func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{}, j map[string]interface{}, n string) error {
  82. if t, ok := j[n]; ok {
  83. v := reflect.ValueOf(t)
  84. jtype := v.Kind()
  85. switch st := ft.(type) {
  86. case *xsql.BasicType:
  87. switch st.Type {
  88. case xsql.UNKNOWN:
  89. return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
  90. case xsql.BIGINT:
  91. if jtype == reflect.Int{
  92. r[n] = t.(int)
  93. }else if jtype == reflect.Float64{
  94. r[n] = int(t.(float64))
  95. }else{
  96. return fmt.Errorf("invalid data type for %s, expect bigint but found %s", n, t)
  97. }
  98. case xsql.FLOAT:
  99. if jtype == reflect.Float64{
  100. r[n] = t.(float64)
  101. }else{
  102. return fmt.Errorf("invalid data type for %s, expect float but found %s", n, t)
  103. }
  104. case xsql.STRINGS:
  105. if jtype == reflect.String{
  106. r[n] = t.(string)
  107. }else{
  108. return fmt.Errorf("invalid data type for %s, expect string but found %s", n, t)
  109. }
  110. case xsql.DATETIME:
  111. switch jtype {
  112. case reflect.Int:
  113. ai := t.(int64)
  114. r[n] = common.TimeFromUnixMilli(ai)
  115. case reflect.Float64:
  116. ai := int64(t.(float64))
  117. r[n] = common.TimeFromUnixMilli(ai)
  118. case reflect.String:
  119. if t, err := p.parseTime(t.(string)); err != nil{
  120. return fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", n, err)
  121. }else{
  122. r[n] = t
  123. }
  124. default:
  125. return fmt.Errorf("invalid data type for %s, expect datatime but find %v", n, t)
  126. }
  127. case xsql.BOOLEAN:
  128. if jtype == reflect.Bool{
  129. r[n] = t.(bool)
  130. }else{
  131. return fmt.Errorf("invalid data type for %s, expect boolean but found %s", n, t)
  132. }
  133. default:
  134. return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
  135. }
  136. case *xsql.ArrayType:
  137. if jtype != reflect.Slice{
  138. return fmt.Errorf("invalid data type for %s, expect array but found %s", n, t)
  139. }
  140. if tempArr, err := p.addArrayField(st, t.([]interface{})); err !=nil{
  141. return err
  142. }else {
  143. r[n] = tempArr
  144. }
  145. case *xsql.RecType:
  146. if jtype != reflect.Map{
  147. return fmt.Errorf("invalid data type for %s, expect struct but found %s", n, t)
  148. }
  149. nextJ, ok := j[n].(map[string]interface{})
  150. if !ok {
  151. return fmt.Errorf("invalid data type for %s, expect map but found %s", n, t)
  152. }
  153. nextR := make(map[string]interface{})
  154. for _, nextF := range st.StreamFields {
  155. nextP := strings.ToLower(nextF.Name)
  156. if e := p.addRecField(nextF.FieldType, nextR, nextJ, nextP); e != nil{
  157. return e
  158. }
  159. }
  160. r[n] = nextR
  161. default:
  162. return fmt.Errorf("unsupported type %T", st)
  163. }
  164. return nil
  165. }else{
  166. return fmt.Errorf("invalid data %s, field %s not found", j, n)
  167. }
  168. }
  169. //ft must be xsql.ArrayType
  170. //side effect: r[p] will be set to the new array
  171. func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{}) (interface{}, error) {
  172. if ft.FieldType != nil { //complex type array or struct
  173. switch st := ft.FieldType.(type) { //Only two complex types supported here
  174. case *xsql.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
  175. var tempSlice [][]interface{}
  176. for i, t := range srcSlice{
  177. if reflect.ValueOf(t).Kind() == reflect.Array{
  178. if tempArr, err := p.addArrayField(st, t.([]interface{})); err !=nil{
  179. return nil, err
  180. }else {
  181. tempSlice = append(tempSlice, tempArr.([]interface{}))
  182. }
  183. }else{
  184. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %s", i, t)
  185. }
  186. }
  187. return tempSlice, nil
  188. case *xsql.RecType:
  189. var tempSlice []map[string]interface{}
  190. for i, t := range srcSlice{
  191. if reflect.ValueOf(t).Kind() == reflect.Map{
  192. j, ok := t.(map[string]interface{})
  193. if !ok {
  194. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %s", i, t)
  195. }
  196. r := make(map[string]interface{})
  197. for _, f := range st.StreamFields {
  198. n := f.Name
  199. if e := p.addRecField(f.FieldType, r, j, n); e != nil{
  200. return nil, e
  201. }
  202. }
  203. tempSlice = append(tempSlice, r)
  204. }else{
  205. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
  206. }
  207. }
  208. return tempSlice, nil
  209. default:
  210. return nil, fmt.Errorf("unsupported type %T", st)
  211. }
  212. }else{ //basic type
  213. switch ft.Type {
  214. case xsql.UNKNOWN:
  215. return nil, fmt.Errorf("invalid data type unknown defined for %s, please checke the stream definition", srcSlice)
  216. case xsql.BIGINT:
  217. var tempSlice []int
  218. for i, t := range srcSlice {
  219. if reflect.ValueOf(t).Kind() == reflect.Float64{
  220. tempSlice = append(tempSlice, int(t.(float64)))
  221. }else{
  222. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
  223. }
  224. }
  225. return tempSlice, nil
  226. case xsql.FLOAT:
  227. var tempSlice []float64
  228. for i, t := range srcSlice {
  229. if reflect.ValueOf(t).Kind() == reflect.Float64{
  230. tempSlice = append(tempSlice, t.(float64))
  231. }else{
  232. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
  233. }
  234. }
  235. return tempSlice, nil
  236. case xsql.STRINGS:
  237. var tempSlice []string
  238. for i, t := range srcSlice {
  239. if reflect.ValueOf(t).Kind() == reflect.String{
  240. tempSlice = append(tempSlice, t.(string))
  241. }else{
  242. return nil, fmt.Errorf("invalid data type for [%d], expect string but found %s", i, t)
  243. }
  244. }
  245. return tempSlice, nil
  246. case xsql.DATETIME:
  247. var tempSlice []time.Time
  248. for i, t := range srcSlice {
  249. jtype := reflect.ValueOf(t).Kind()
  250. switch jtype {
  251. case reflect.Int:
  252. ai := t.(int64)
  253. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  254. case reflect.Float64:
  255. ai := int64(t.(float64))
  256. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  257. case reflect.String:
  258. if ai, err := p.parseTime(t.(string)); err != nil{
  259. return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", t, err)
  260. }else{
  261. tempSlice = append(tempSlice, ai)
  262. }
  263. default:
  264. return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %v", i, t)
  265. }
  266. }
  267. return tempSlice, nil
  268. case xsql.BOOLEAN:
  269. var tempSlice []bool
  270. for i, t := range srcSlice {
  271. if reflect.ValueOf(t).Kind() == reflect.Bool{
  272. tempSlice = append(tempSlice, t.(bool))
  273. }else{
  274. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %s", i, t)
  275. }
  276. }
  277. return tempSlice, nil
  278. default:
  279. return nil, fmt.Errorf("invalid data type for %T, datetime type is not supported yet", ft.Type)
  280. }
  281. }
  282. }