preprocessor.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. package plans
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "math"
  9. "reflect"
  10. "strconv"
  11. "strings"
  12. "time"
  13. )
  14. type Preprocessor struct {
  15. streamStmt *xsql.StreamStmt
  16. aliasFields xsql.Fields
  17. isEventTime bool
  18. timestampField string
  19. timestampFormat string
  20. }
  21. func NewPreprocessor(s *xsql.StreamStmt, fs xsql.Fields, iet bool) (*Preprocessor, error) {
  22. p := &Preprocessor{streamStmt: s, aliasFields: fs, isEventTime: iet}
  23. if iet {
  24. if tf, ok := s.Options["TIMESTAMP"]; ok {
  25. p.timestampField = tf
  26. } else {
  27. return nil, fmt.Errorf("preprocessor is set to be event time but stream option TIMESTAMP not found")
  28. }
  29. if ts, ok := s.Options["TIMESTAMP_FORMAT"]; ok {
  30. p.timestampFormat = ts
  31. }
  32. }
  33. return p, nil
  34. }
  35. /*
  36. * input: *xsql.Tuple
  37. * output: *xsql.Tuple
  38. */
  39. func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  40. log := ctx.GetLogger()
  41. tuple, ok := data.(*xsql.Tuple)
  42. if !ok {
  43. return fmt.Errorf("expect tuple data type")
  44. }
  45. log.Debugf("preprocessor receive %s", tuple.Message)
  46. result := make(map[string]interface{})
  47. if p.streamStmt.StreamFields != nil {
  48. for _, f := range p.streamStmt.StreamFields {
  49. fname := strings.ToLower(f.Name)
  50. if e := p.addRecField(f.FieldType, result, tuple.Message, fname); e != nil {
  51. return fmt.Errorf("error in preprocessor: %s", e)
  52. }
  53. }
  54. } else {
  55. result = tuple.Message
  56. }
  57. //If the field has alias name, then evaluate the alias field before transfer it to proceeding operators, and put it into result.
  58. //Otherwise, the GROUP BY, ORDER BY statement cannot get the value.
  59. for _, f := range p.aliasFields {
  60. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, fv)}
  61. v := ve.Eval(f.Expr)
  62. if _, ok := v.(error); ok {
  63. return v
  64. } else {
  65. result[strings.ToLower(f.AName)] = v
  66. }
  67. }
  68. tuple.Message = result
  69. if p.isEventTime {
  70. if t, ok := result[p.timestampField]; ok {
  71. if ts, err := common.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
  72. return fmt.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
  73. } else {
  74. tuple.Timestamp = ts
  75. log.Debugf("preprocessor calculate timstamp %d", tuple.Timestamp)
  76. }
  77. } else {
  78. return fmt.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
  79. }
  80. }
  81. return tuple
  82. }
  83. func (p *Preprocessor) parseTime(s string) (time.Time, error) {
  84. if f, ok := p.streamStmt.Options["TIMESTAMP_FORMAT"]; ok {
  85. return common.ParseTime(s, f)
  86. } else {
  87. return time.Parse(common.JSISO, s)
  88. }
  89. }
  90. func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{}, j map[string]interface{}, n string) error {
  91. if t, ok := j[n]; ok {
  92. v := reflect.ValueOf(t)
  93. jtype := v.Kind()
  94. switch st := ft.(type) {
  95. case *xsql.BasicType:
  96. switch st.Type {
  97. case xsql.UNKNOWN:
  98. return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
  99. case xsql.BIGINT:
  100. if jtype == reflect.Int {
  101. r[n] = t.(int)
  102. } else if jtype == reflect.Float64 {
  103. if tt, ok1 := t.(float64); ok1 {
  104. if tt > math.MaxInt64 {
  105. r[n] = uint64(tt)
  106. } else {
  107. r[n] = int(tt)
  108. }
  109. }
  110. } else if jtype == reflect.String {
  111. if i, err := strconv.Atoi(t.(string)); err != nil {
  112. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  113. } else {
  114. r[n] = i
  115. }
  116. } else if jtype == reflect.Uint64 {
  117. r[n] = t.(uint64)
  118. } else {
  119. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  120. }
  121. case xsql.FLOAT:
  122. if jtype == reflect.Float64 {
  123. r[n] = t.(float64)
  124. } else if jtype == reflect.String {
  125. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  126. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  127. } else {
  128. r[n] = f
  129. }
  130. } else {
  131. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  132. }
  133. case xsql.STRINGS:
  134. if jtype == reflect.String {
  135. r[n] = t.(string)
  136. } else {
  137. return fmt.Errorf("invalid data type for %s, expect string but found %[2]T(%[2]v)", n, t)
  138. }
  139. case xsql.DATETIME:
  140. switch jtype {
  141. case reflect.Int:
  142. ai := t.(int64)
  143. r[n] = common.TimeFromUnixMilli(ai)
  144. case reflect.Float64:
  145. ai := int64(t.(float64))
  146. r[n] = common.TimeFromUnixMilli(ai)
  147. case reflect.String:
  148. if t, err := p.parseTime(t.(string)); err != nil {
  149. return fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", n, err)
  150. } else {
  151. r[n] = t
  152. }
  153. default:
  154. return fmt.Errorf("invalid data type for %s, expect datatime but find %[2]T(%[2]v)", n, t)
  155. }
  156. case xsql.BOOLEAN:
  157. if jtype == reflect.Bool {
  158. r[n] = t.(bool)
  159. } else if jtype == reflect.String {
  160. if i, err := strconv.ParseBool(t.(string)); err != nil {
  161. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  162. } else {
  163. r[n] = i
  164. }
  165. } else {
  166. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  167. }
  168. default:
  169. return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
  170. }
  171. case *xsql.ArrayType:
  172. var s []interface{}
  173. if t == nil {
  174. s = nil
  175. } else if jtype == reflect.Slice {
  176. s = t.([]interface{})
  177. } else if jtype == reflect.String {
  178. err := json.Unmarshal([]byte(t.(string)), &s)
  179. if err != nil {
  180. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  181. }
  182. } else {
  183. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  184. }
  185. if tempArr, err := p.addArrayField(st, s); err != nil {
  186. return fmt.Errorf("fail to parse field %s: %s", n, err)
  187. } else {
  188. r[n] = tempArr
  189. }
  190. case *xsql.RecType:
  191. nextJ := make(map[string]interface{})
  192. if t == nil {
  193. nextJ = nil
  194. r[n] = nextJ
  195. return nil
  196. } else if jtype == reflect.Map {
  197. nextJ, ok = t.(map[string]interface{})
  198. if !ok {
  199. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  200. }
  201. } else if jtype == reflect.String {
  202. err := json.Unmarshal([]byte(t.(string)), &nextJ)
  203. if err != nil {
  204. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  205. }
  206. } else {
  207. return fmt.Errorf("invalid data type for %s, expect struct but found %[2]T(%[2]v)", n, t)
  208. }
  209. nextR := make(map[string]interface{})
  210. for _, nextF := range st.StreamFields {
  211. nextP := strings.ToLower(nextF.Name)
  212. if e := p.addRecField(nextF.FieldType, nextR, nextJ, nextP); e != nil {
  213. return e
  214. }
  215. }
  216. r[n] = nextR
  217. default:
  218. return fmt.Errorf("unsupported type %T", st)
  219. }
  220. return nil
  221. } else {
  222. return fmt.Errorf("invalid data %s, field %s not found", j, n)
  223. }
  224. }
  225. //ft must be xsql.ArrayType
  226. //side effect: r[p] will be set to the new array
  227. func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{}) (interface{}, error) {
  228. if ft.FieldType != nil { //complex type array or struct
  229. switch st := ft.FieldType.(type) { //Only two complex types supported here
  230. case *xsql.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
  231. if srcSlice == nil {
  232. return [][]interface{}(nil), nil
  233. }
  234. var s []interface{}
  235. var tempSlice reflect.Value
  236. for i, t := range srcSlice {
  237. jtype := reflect.ValueOf(t).Kind()
  238. if t == nil {
  239. s = nil
  240. } else if jtype == reflect.Slice || jtype == reflect.Array {
  241. s = t.([]interface{})
  242. } else if jtype == reflect.String {
  243. err := json.Unmarshal([]byte(t.(string)), &s)
  244. if err != nil {
  245. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  246. }
  247. } else {
  248. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  249. }
  250. if tempArr, err := p.addArrayField(st, s); err != nil {
  251. return nil, err
  252. } else {
  253. if !tempSlice.IsValid() {
  254. s := reflect.TypeOf(tempArr)
  255. tempSlice = reflect.MakeSlice(reflect.SliceOf(s), 0, 0)
  256. }
  257. tempSlice = reflect.Append(tempSlice, reflect.ValueOf(tempArr))
  258. }
  259. }
  260. return tempSlice.Interface(), nil
  261. case *xsql.RecType:
  262. if srcSlice == nil {
  263. return []map[string]interface{}(nil), nil
  264. }
  265. tempSlice := make([]map[string]interface{}, 0)
  266. for i, t := range srcSlice {
  267. jtype := reflect.ValueOf(t).Kind()
  268. j := make(map[string]interface{})
  269. var ok bool
  270. if t == nil {
  271. j = nil
  272. tempSlice = append(tempSlice, j)
  273. continue
  274. } else if jtype == reflect.Map {
  275. j, ok = t.(map[string]interface{})
  276. if !ok {
  277. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  278. }
  279. } else if jtype == reflect.String {
  280. err := json.Unmarshal([]byte(t.(string)), &j)
  281. if err != nil {
  282. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  283. }
  284. } else {
  285. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  286. }
  287. r := make(map[string]interface{})
  288. for _, f := range st.StreamFields {
  289. n := f.Name
  290. if e := p.addRecField(f.FieldType, r, j, n); e != nil {
  291. return nil, e
  292. }
  293. }
  294. tempSlice = append(tempSlice, r)
  295. }
  296. return tempSlice, nil
  297. default:
  298. return nil, fmt.Errorf("unsupported type %T", st)
  299. }
  300. } else { //basic type
  301. switch ft.Type {
  302. case xsql.UNKNOWN:
  303. return nil, fmt.Errorf("invalid data type unknown defined for %s, please checke the stream definition", srcSlice)
  304. case xsql.BIGINT:
  305. if srcSlice == nil {
  306. return []int(nil), nil
  307. }
  308. tempSlice := make([]int, 0)
  309. for i, t := range srcSlice {
  310. jtype := reflect.ValueOf(t).Kind()
  311. if jtype == reflect.Float64 {
  312. tempSlice = append(tempSlice, int(t.(float64)))
  313. } else if jtype == reflect.String {
  314. if v, err := strconv.Atoi(t.(string)); err != nil {
  315. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  316. } else {
  317. tempSlice = append(tempSlice, v)
  318. }
  319. } else {
  320. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  321. }
  322. }
  323. return tempSlice, nil
  324. case xsql.FLOAT:
  325. if srcSlice == nil {
  326. return []float64(nil), nil
  327. }
  328. tempSlice := make([]float64, 0)
  329. for i, t := range srcSlice {
  330. jtype := reflect.ValueOf(t).Kind()
  331. if jtype == reflect.Float64 {
  332. tempSlice = append(tempSlice, t.(float64))
  333. } else if jtype == reflect.String {
  334. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  335. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  336. } else {
  337. tempSlice = append(tempSlice, f)
  338. }
  339. } else {
  340. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  341. }
  342. }
  343. return tempSlice, nil
  344. case xsql.STRINGS:
  345. if srcSlice == nil {
  346. return []string(nil), nil
  347. }
  348. tempSlice := make([]string, 0)
  349. for i, t := range srcSlice {
  350. if reflect.ValueOf(t).Kind() == reflect.String {
  351. tempSlice = append(tempSlice, t.(string))
  352. } else {
  353. return nil, fmt.Errorf("invalid data type for [%d], expect string but found %[2]T(%[2]v)", i, t)
  354. }
  355. }
  356. return tempSlice, nil
  357. case xsql.DATETIME:
  358. if srcSlice == nil {
  359. return []time.Time(nil), nil
  360. }
  361. tempSlice := make([]time.Time, 0)
  362. for i, t := range srcSlice {
  363. jtype := reflect.ValueOf(t).Kind()
  364. switch jtype {
  365. case reflect.Int:
  366. ai := t.(int64)
  367. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  368. case reflect.Float64:
  369. ai := int64(t.(float64))
  370. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  371. case reflect.String:
  372. if ai, err := p.parseTime(t.(string)); err != nil {
  373. return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %[2]T(%[2]v)", t, err)
  374. } else {
  375. tempSlice = append(tempSlice, ai)
  376. }
  377. default:
  378. return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %[2]T(%[2]v)", i, t)
  379. }
  380. }
  381. return tempSlice, nil
  382. case xsql.BOOLEAN:
  383. if srcSlice == nil {
  384. return []bool(nil), nil
  385. }
  386. tempSlice := make([]bool, 0)
  387. for i, t := range srcSlice {
  388. jtype := reflect.ValueOf(t).Kind()
  389. if jtype == reflect.Bool {
  390. tempSlice = append(tempSlice, t.(bool))
  391. } else if jtype == reflect.String {
  392. if v, err := strconv.ParseBool(t.(string)); err != nil {
  393. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  394. } else {
  395. tempSlice = append(tempSlice, v)
  396. }
  397. } else {
  398. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  399. }
  400. }
  401. return tempSlice, nil
  402. default:
  403. return nil, fmt.Errorf("invalid data type for %T", ft.Type)
  404. }
  405. }
  406. }