preprocessor.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. package plans
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "math"
  9. "reflect"
  10. "strconv"
  11. "strings"
  12. "time"
  13. )
  14. type Preprocessor struct {
  15. streamStmt *xsql.StreamStmt
  16. aliasFields xsql.Fields
  17. isEventTime bool
  18. timestampField string
  19. timestampFormat string
  20. }
  21. func NewPreprocessor(s *xsql.StreamStmt, fs xsql.Fields, iet bool) (*Preprocessor, error) {
  22. p := &Preprocessor{streamStmt: s, aliasFields: fs, isEventTime: iet}
  23. if iet {
  24. if tf, ok := s.Options["TIMESTAMP"]; ok {
  25. p.timestampField = tf
  26. } else {
  27. return nil, fmt.Errorf("preprocessor is set to be event time but stream option TIMESTAMP not found")
  28. }
  29. if ts, ok := s.Options["TIMESTAMP_FORMAT"]; ok {
  30. p.timestampFormat = ts
  31. }
  32. }
  33. return p, nil
  34. }
  35. /*
  36. * input: *xsql.Tuple
  37. * output: *xsql.Tuple
  38. */
  39. func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  40. log := ctx.GetLogger()
  41. tuple, ok := data.(*xsql.Tuple)
  42. if !ok {
  43. return fmt.Errorf("expect tuple data type")
  44. }
  45. log.Debugf("preprocessor receive %s", tuple.Message)
  46. result := make(map[string]interface{})
  47. if p.streamStmt.StreamFields != nil {
  48. for _, f := range p.streamStmt.StreamFields {
  49. if e := p.addRecField(f.FieldType, result, tuple.Message, f.Name); e != nil {
  50. return fmt.Errorf("error in preprocessor: %s", e)
  51. }
  52. }
  53. } else {
  54. result = tuple.Message
  55. }
  56. //If the field has alias name, then evaluate the alias field before transfer it to proceeding operators, and put it into result.
  57. //Otherwise, the GROUP BY, ORDER BY statement cannot get the value.
  58. for _, f := range p.aliasFields {
  59. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, fv)}
  60. v := ve.Eval(f.Expr)
  61. if _, ok := v.(error); ok {
  62. return v
  63. } else {
  64. result[f.AName] = v
  65. }
  66. }
  67. tuple.Message = result
  68. if p.isEventTime {
  69. if t, ok := result[p.timestampField]; ok {
  70. if ts, err := common.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
  71. return fmt.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
  72. } else {
  73. tuple.Timestamp = ts
  74. log.Debugf("preprocessor calculate timstamp %d", tuple.Timestamp)
  75. }
  76. } else {
  77. return fmt.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
  78. }
  79. }
  80. return tuple
  81. }
  82. func (p *Preprocessor) parseTime(s string) (time.Time, error) {
  83. if f, ok := p.streamStmt.Options["TIMESTAMP_FORMAT"]; ok {
  84. return common.ParseTime(s, f)
  85. } else {
  86. return time.Parse(common.JSISO, s)
  87. }
  88. }
  89. func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{}, j xsql.Message, n string) error {
  90. if t, ok := j.Value(n); ok {
  91. v := reflect.ValueOf(t)
  92. jtype := v.Kind()
  93. switch st := ft.(type) {
  94. case *xsql.BasicType:
  95. switch st.Type {
  96. case xsql.UNKNOWN:
  97. return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
  98. case xsql.BIGINT:
  99. if jtype == reflect.Int {
  100. r[n] = t.(int)
  101. } else if jtype == reflect.Float64 {
  102. if tt, ok1 := t.(float64); ok1 {
  103. if tt > math.MaxInt64 {
  104. r[n] = uint64(tt)
  105. } else {
  106. r[n] = int(tt)
  107. }
  108. }
  109. } else if jtype == reflect.String {
  110. if i, err := strconv.Atoi(t.(string)); err != nil {
  111. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  112. } else {
  113. r[n] = i
  114. }
  115. } else if jtype == reflect.Uint64 {
  116. r[n] = t.(uint64)
  117. } else {
  118. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  119. }
  120. case xsql.FLOAT:
  121. if jtype == reflect.Float64 {
  122. r[n] = t.(float64)
  123. } else if jtype == reflect.String {
  124. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  125. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  126. } else {
  127. r[n] = f
  128. }
  129. } else {
  130. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  131. }
  132. case xsql.STRINGS:
  133. if jtype == reflect.String {
  134. r[n] = t.(string)
  135. } else {
  136. return fmt.Errorf("invalid data type for %s, expect string but found %[2]T(%[2]v)", n, t)
  137. }
  138. case xsql.DATETIME:
  139. switch jtype {
  140. case reflect.Int:
  141. ai := t.(int64)
  142. r[n] = common.TimeFromUnixMilli(ai)
  143. case reflect.Float64:
  144. ai := int64(t.(float64))
  145. r[n] = common.TimeFromUnixMilli(ai)
  146. case reflect.String:
  147. if t, err := p.parseTime(t.(string)); err != nil {
  148. return fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", n, err)
  149. } else {
  150. r[n] = t
  151. }
  152. default:
  153. return fmt.Errorf("invalid data type for %s, expect datatime but find %[2]T(%[2]v)", n, t)
  154. }
  155. case xsql.BOOLEAN:
  156. if jtype == reflect.Bool {
  157. r[n] = t.(bool)
  158. } else if jtype == reflect.String {
  159. if i, err := strconv.ParseBool(t.(string)); err != nil {
  160. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  161. } else {
  162. r[n] = i
  163. }
  164. } else {
  165. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  166. }
  167. default:
  168. return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
  169. }
  170. case *xsql.ArrayType:
  171. var s []interface{}
  172. if t == nil {
  173. s = nil
  174. } else if jtype == reflect.Slice {
  175. s = t.([]interface{})
  176. } else if jtype == reflect.String {
  177. err := json.Unmarshal([]byte(t.(string)), &s)
  178. if err != nil {
  179. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  180. }
  181. } else {
  182. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  183. }
  184. if tempArr, err := p.addArrayField(st, s); err != nil {
  185. return fmt.Errorf("fail to parse field %s: %s", n, err)
  186. } else {
  187. r[n] = tempArr
  188. }
  189. case *xsql.RecType:
  190. nextJ := make(map[string]interface{})
  191. if t == nil {
  192. nextJ = nil
  193. r[n] = nextJ
  194. return nil
  195. } else if jtype == reflect.Map {
  196. nextJ, ok = t.(map[string]interface{})
  197. if !ok {
  198. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  199. }
  200. } else if jtype == reflect.String {
  201. err := json.Unmarshal([]byte(t.(string)), &nextJ)
  202. if err != nil {
  203. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  204. }
  205. } else {
  206. return fmt.Errorf("invalid data type for %s, expect struct but found %[2]T(%[2]v)", n, t)
  207. }
  208. nextR := make(map[string]interface{})
  209. for _, nextF := range st.StreamFields {
  210. nextP := strings.ToLower(nextF.Name)
  211. if e := p.addRecField(nextF.FieldType, nextR, nextJ, nextP); e != nil {
  212. return e
  213. }
  214. }
  215. r[n] = nextR
  216. default:
  217. return fmt.Errorf("unsupported type %T", st)
  218. }
  219. return nil
  220. } else {
  221. return fmt.Errorf("invalid data %s, field %s not found", j, n)
  222. }
  223. }
  224. //ft must be xsql.ArrayType
  225. //side effect: r[p] will be set to the new array
  226. func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{}) (interface{}, error) {
  227. if ft.FieldType != nil { //complex type array or struct
  228. switch st := ft.FieldType.(type) { //Only two complex types supported here
  229. case *xsql.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
  230. if srcSlice == nil {
  231. return [][]interface{}(nil), nil
  232. }
  233. var s []interface{}
  234. var tempSlice reflect.Value
  235. for i, t := range srcSlice {
  236. jtype := reflect.ValueOf(t).Kind()
  237. if t == nil {
  238. s = nil
  239. } else if jtype == reflect.Slice || jtype == reflect.Array {
  240. s = t.([]interface{})
  241. } else if jtype == reflect.String {
  242. err := json.Unmarshal([]byte(t.(string)), &s)
  243. if err != nil {
  244. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  245. }
  246. } else {
  247. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  248. }
  249. if tempArr, err := p.addArrayField(st, s); err != nil {
  250. return nil, err
  251. } else {
  252. if !tempSlice.IsValid() {
  253. s := reflect.TypeOf(tempArr)
  254. tempSlice = reflect.MakeSlice(reflect.SliceOf(s), 0, 0)
  255. }
  256. tempSlice = reflect.Append(tempSlice, reflect.ValueOf(tempArr))
  257. }
  258. }
  259. return tempSlice.Interface(), nil
  260. case *xsql.RecType:
  261. if srcSlice == nil {
  262. return []map[string]interface{}(nil), nil
  263. }
  264. tempSlice := make([]map[string]interface{}, 0)
  265. for i, t := range srcSlice {
  266. jtype := reflect.ValueOf(t).Kind()
  267. j := make(map[string]interface{})
  268. var ok bool
  269. if t == nil {
  270. j = nil
  271. tempSlice = append(tempSlice, j)
  272. continue
  273. } else if jtype == reflect.Map {
  274. j, ok = t.(map[string]interface{})
  275. if !ok {
  276. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  277. }
  278. } else if jtype == reflect.String {
  279. err := json.Unmarshal([]byte(t.(string)), &j)
  280. if err != nil {
  281. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  282. }
  283. } else {
  284. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  285. }
  286. r := make(map[string]interface{})
  287. for _, f := range st.StreamFields {
  288. n := f.Name
  289. if e := p.addRecField(f.FieldType, r, j, n); e != nil {
  290. return nil, e
  291. }
  292. }
  293. tempSlice = append(tempSlice, r)
  294. }
  295. return tempSlice, nil
  296. default:
  297. return nil, fmt.Errorf("unsupported type %T", st)
  298. }
  299. } else { //basic type
  300. switch ft.Type {
  301. case xsql.UNKNOWN:
  302. return nil, fmt.Errorf("invalid data type unknown defined for %s, please checke the stream definition", srcSlice)
  303. case xsql.BIGINT:
  304. if srcSlice == nil {
  305. return []int(nil), nil
  306. }
  307. tempSlice := make([]int, 0)
  308. for i, t := range srcSlice {
  309. jtype := reflect.ValueOf(t).Kind()
  310. if jtype == reflect.Float64 {
  311. tempSlice = append(tempSlice, int(t.(float64)))
  312. } else if jtype == reflect.String {
  313. if v, err := strconv.Atoi(t.(string)); err != nil {
  314. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  315. } else {
  316. tempSlice = append(tempSlice, v)
  317. }
  318. } else {
  319. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  320. }
  321. }
  322. return tempSlice, nil
  323. case xsql.FLOAT:
  324. if srcSlice == nil {
  325. return []float64(nil), nil
  326. }
  327. tempSlice := make([]float64, 0)
  328. for i, t := range srcSlice {
  329. jtype := reflect.ValueOf(t).Kind()
  330. if jtype == reflect.Float64 {
  331. tempSlice = append(tempSlice, t.(float64))
  332. } else if jtype == reflect.String {
  333. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  334. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  335. } else {
  336. tempSlice = append(tempSlice, f)
  337. }
  338. } else {
  339. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  340. }
  341. }
  342. return tempSlice, nil
  343. case xsql.STRINGS:
  344. if srcSlice == nil {
  345. return []string(nil), nil
  346. }
  347. tempSlice := make([]string, 0)
  348. for i, t := range srcSlice {
  349. if reflect.ValueOf(t).Kind() == reflect.String {
  350. tempSlice = append(tempSlice, t.(string))
  351. } else {
  352. return nil, fmt.Errorf("invalid data type for [%d], expect string but found %[2]T(%[2]v)", i, t)
  353. }
  354. }
  355. return tempSlice, nil
  356. case xsql.DATETIME:
  357. if srcSlice == nil {
  358. return []time.Time(nil), nil
  359. }
  360. tempSlice := make([]time.Time, 0)
  361. for i, t := range srcSlice {
  362. jtype := reflect.ValueOf(t).Kind()
  363. switch jtype {
  364. case reflect.Int:
  365. ai := t.(int64)
  366. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  367. case reflect.Float64:
  368. ai := int64(t.(float64))
  369. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  370. case reflect.String:
  371. if ai, err := p.parseTime(t.(string)); err != nil {
  372. return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %[2]T(%[2]v)", t, err)
  373. } else {
  374. tempSlice = append(tempSlice, ai)
  375. }
  376. default:
  377. return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %[2]T(%[2]v)", i, t)
  378. }
  379. }
  380. return tempSlice, nil
  381. case xsql.BOOLEAN:
  382. if srcSlice == nil {
  383. return []bool(nil), nil
  384. }
  385. tempSlice := make([]bool, 0)
  386. for i, t := range srcSlice {
  387. jtype := reflect.ValueOf(t).Kind()
  388. if jtype == reflect.Bool {
  389. tempSlice = append(tempSlice, t.(bool))
  390. } else if jtype == reflect.String {
  391. if v, err := strconv.ParseBool(t.(string)); err != nil {
  392. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  393. } else {
  394. tempSlice = append(tempSlice, v)
  395. }
  396. } else {
  397. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  398. }
  399. }
  400. return tempSlice, nil
  401. default:
  402. return nil, fmt.Errorf("invalid data type for %T", ft.Type)
  403. }
  404. }
  405. }