preprocessor.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. package plans
  2. import (
  3. "context"
  4. "engine/common"
  5. "engine/xsql"
  6. "fmt"
  7. "reflect"
  8. "strings"
  9. "time"
  10. )
  11. type Preprocessor struct {
  12. streamStmt *xsql.StreamStmt
  13. isEventTime bool
  14. timestampField string
  15. timestampFormat string
  16. }
  17. func NewPreprocessor(s *xsql.StreamStmt, iet bool) (*Preprocessor, error){
  18. p := &Preprocessor{streamStmt: s, isEventTime: iet}
  19. if iet {
  20. if tf, ok := s.Options["TIMESTAMP"]; ok{
  21. p.timestampField = tf
  22. }else{
  23. return nil, fmt.Errorf("preprocessor is set to be event time but stream option TIMESTAMP not found")
  24. }
  25. if ts, ok := s.Options["TIMESTAMP_FORMAT"]; ok{
  26. p.timestampFormat = ts
  27. }
  28. }
  29. return p, nil
  30. }
  31. /*
  32. * input: *xsql.Tuple
  33. * output: *xsql.Tuple
  34. */
  35. func (p *Preprocessor) Apply(ctx context.Context, data interface{}) interface{} {
  36. log := common.GetLogger(ctx)
  37. tuple, ok := data.(*xsql.Tuple)
  38. if !ok {
  39. log.Errorf("Expect tuple data type")
  40. return nil
  41. }
  42. log.Debugf("preprocessor receive %s", tuple.Message)
  43. result := make(map[string]interface{})
  44. for _, f := range p.streamStmt.StreamFields {
  45. fname := strings.ToLower(f.Name)
  46. if e := p.addRecField(f.FieldType, result, tuple.Message, fname); e != nil{
  47. log.Errorf("error in preprocessor: %s", e)
  48. return nil
  49. }
  50. }
  51. tuple.Message = result
  52. if p.isEventTime{
  53. if t, ok := result[p.timestampField]; ok{
  54. if ts, err := common.InterfaceToUnixMilli(t, p.timestampFormat); err != nil{
  55. log.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
  56. return nil
  57. }else{
  58. tuple.Timestamp = ts
  59. log.Debugf("preprocessor calculate timstamp %d", tuple.Timestamp)
  60. }
  61. }else{
  62. log.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
  63. return nil
  64. }
  65. }
  66. return tuple
  67. }
  68. func (p *Preprocessor) parseTime(s string) (time.Time, error){
  69. if f, ok := p.streamStmt.Options["TIMESTAMP_FORMAT"]; ok{
  70. return common.ParseTime(s, f)
  71. }else{
  72. return time.Parse(common.JSISO, s)
  73. }
  74. }
  75. func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{}, j map[string]interface{}, n string) error {
  76. if t, ok := j[n]; ok {
  77. v := reflect.ValueOf(t)
  78. jtype := v.Kind()
  79. switch st := ft.(type) {
  80. case *xsql.BasicType:
  81. switch st.Type {
  82. case xsql.UNKNOWN:
  83. return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
  84. case xsql.BIGINT:
  85. if jtype == reflect.Int{
  86. r[n] = t.(int)
  87. }else if jtype == reflect.Float64{
  88. r[n] = int(t.(float64))
  89. }else{
  90. return fmt.Errorf("invalid data type for %s, expect bigint but found %s", n, t)
  91. }
  92. case xsql.FLOAT:
  93. if jtype == reflect.Float64{
  94. r[n] = t.(float64)
  95. }else{
  96. return fmt.Errorf("invalid data type for %s, expect float but found %s", n, t)
  97. }
  98. case xsql.STRINGS:
  99. if jtype == reflect.String{
  100. r[n] = t.(string)
  101. }else{
  102. return fmt.Errorf("invalid data type for %s, expect string but found %s", n, t)
  103. }
  104. case xsql.DATETIME:
  105. switch jtype {
  106. case reflect.Int:
  107. ai := t.(int64)
  108. r[n] = common.TimeFromUnixMilli(ai)
  109. case reflect.Float64:
  110. ai := int64(t.(float64))
  111. r[n] = common.TimeFromUnixMilli(ai)
  112. case reflect.String:
  113. if t, err := p.parseTime(t.(string)); err != nil{
  114. return fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", n, err)
  115. }else{
  116. r[n] = t
  117. }
  118. default:
  119. return fmt.Errorf("invalid data type for %s, expect datatime but find %v", n, t)
  120. }
  121. case xsql.BOOLEAN:
  122. if jtype == reflect.Bool{
  123. r[n] = t.(bool)
  124. }else{
  125. return fmt.Errorf("invalid data type for %s, expect boolean but found %s", n, t)
  126. }
  127. default:
  128. return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
  129. }
  130. case *xsql.ArrayType:
  131. if jtype != reflect.Slice{
  132. return fmt.Errorf("invalid data type for %s, expect array but found %s", n, t)
  133. }
  134. if tempArr, err := p.addArrayField(st, t.([]interface{})); err !=nil{
  135. return err
  136. }else {
  137. r[n] = tempArr
  138. }
  139. case *xsql.RecType:
  140. if jtype != reflect.Map{
  141. return fmt.Errorf("invalid data type for %s, expect struct but found %s", n, t)
  142. }
  143. nextJ, ok := j[n].(map[string]interface{})
  144. if !ok {
  145. return fmt.Errorf("invalid data type for %s, expect map but found %s", n, t)
  146. }
  147. nextR := make(map[string]interface{})
  148. for _, nextF := range st.StreamFields {
  149. nextP := strings.ToLower(nextF.Name)
  150. if e := p.addRecField(nextF.FieldType, nextR, nextJ, nextP); e != nil{
  151. return e
  152. }
  153. }
  154. r[n] = nextR
  155. default:
  156. return fmt.Errorf("unsupported type %T", st)
  157. }
  158. return nil
  159. }else{
  160. return fmt.Errorf("invalid data %s, field %s not found", j, n)
  161. }
  162. }
  163. //ft must be xsql.ArrayType
  164. //side effect: r[p] will be set to the new array
  165. func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{}) (interface{}, error) {
  166. if ft.FieldType != nil { //complex type array or struct
  167. switch st := ft.FieldType.(type) { //Only two complex types supported here
  168. case *xsql.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
  169. var tempSlice [][]interface{}
  170. for i, t := range srcSlice{
  171. if reflect.ValueOf(t).Kind() == reflect.Array{
  172. if tempArr, err := p.addArrayField(st, t.([]interface{})); err !=nil{
  173. return nil, err
  174. }else {
  175. tempSlice = append(tempSlice, tempArr.([]interface{}))
  176. }
  177. }else{
  178. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %s", i, t)
  179. }
  180. }
  181. return tempSlice, nil
  182. case *xsql.RecType:
  183. var tempSlice []map[string]interface{}
  184. for i, t := range srcSlice{
  185. if reflect.ValueOf(t).Kind() == reflect.Map{
  186. j, ok := t.(map[string]interface{})
  187. if !ok {
  188. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %s", i, t)
  189. }
  190. r := make(map[string]interface{})
  191. for _, f := range st.StreamFields {
  192. n := f.Name
  193. if e := p.addRecField(f.FieldType, r, j, n); e != nil{
  194. return nil, e
  195. }
  196. }
  197. tempSlice = append(tempSlice, r)
  198. }else{
  199. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
  200. }
  201. }
  202. return tempSlice, nil
  203. default:
  204. return nil, fmt.Errorf("unsupported type %T", st)
  205. }
  206. }else{ //basic type
  207. switch ft.Type {
  208. case xsql.UNKNOWN:
  209. return nil, fmt.Errorf("invalid data type unknown defined for %s, please checke the stream definition", srcSlice)
  210. case xsql.BIGINT:
  211. var tempSlice []int
  212. for i, t := range srcSlice {
  213. if reflect.ValueOf(t).Kind() == reflect.Float64{
  214. tempSlice = append(tempSlice, int(t.(float64)))
  215. }else{
  216. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
  217. }
  218. }
  219. return tempSlice, nil
  220. case xsql.FLOAT:
  221. var tempSlice []float64
  222. for i, t := range srcSlice {
  223. if reflect.ValueOf(t).Kind() == reflect.Float64{
  224. tempSlice = append(tempSlice, t.(float64))
  225. }else{
  226. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
  227. }
  228. }
  229. return tempSlice, nil
  230. case xsql.STRINGS:
  231. var tempSlice []string
  232. for i, t := range srcSlice {
  233. if reflect.ValueOf(t).Kind() == reflect.String{
  234. tempSlice = append(tempSlice, t.(string))
  235. }else{
  236. return nil, fmt.Errorf("invalid data type for [%d], expect string but found %s", i, t)
  237. }
  238. }
  239. return tempSlice, nil
  240. case xsql.DATETIME:
  241. var tempSlice []time.Time
  242. for i, t := range srcSlice {
  243. jtype := reflect.ValueOf(t).Kind()
  244. switch jtype {
  245. case reflect.Int:
  246. ai := t.(int64)
  247. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  248. case reflect.Float64:
  249. ai := int64(t.(float64))
  250. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  251. case reflect.String:
  252. if ai, err := p.parseTime(t.(string)); err != nil{
  253. return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", t, err)
  254. }else{
  255. tempSlice = append(tempSlice, ai)
  256. }
  257. default:
  258. return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %v", i, t)
  259. }
  260. }
  261. return tempSlice, nil
  262. case xsql.BOOLEAN:
  263. var tempSlice []bool
  264. for i, t := range srcSlice {
  265. if reflect.ValueOf(t).Kind() == reflect.Bool{
  266. tempSlice = append(tempSlice, t.(bool))
  267. }else{
  268. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %s", i, t)
  269. }
  270. }
  271. return tempSlice, nil
  272. default:
  273. return nil, fmt.Errorf("invalid data type for %T, datetime type is not supported yet", ft.Type)
  274. }
  275. }
  276. }