preprocessor.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. package plans
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "math"
  9. "reflect"
  10. "strconv"
  11. "strings"
  12. "time"
  13. )
  14. type Preprocessor struct {
  15. streamStmt *xsql.StreamStmt
  16. aliasFields xsql.Fields
  17. isSelectAll bool
  18. isEventTime bool
  19. timestampField string
  20. timestampFormat string
  21. }
  22. func NewPreprocessor(s *xsql.StreamStmt, fs xsql.Fields, iet bool, isa bool) (*Preprocessor, error) {
  23. p := &Preprocessor{streamStmt: s, aliasFields: fs, isEventTime: iet, isSelectAll: isa}
  24. if iet {
  25. if tf, ok := s.Options["TIMESTAMP"]; ok {
  26. p.timestampField = tf
  27. } else {
  28. return nil, fmt.Errorf("preprocessor is set to be event time but stream option TIMESTAMP not found")
  29. }
  30. if ts, ok := s.Options["TIMESTAMP_FORMAT"]; ok {
  31. p.timestampFormat = ts
  32. }
  33. }
  34. return p, nil
  35. }
  36. /*
  37. * input: *xsql.Tuple
  38. * output: *xsql.Tuple
  39. */
  40. func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  41. log := ctx.GetLogger()
  42. tuple, ok := data.(*xsql.Tuple)
  43. if !ok {
  44. return fmt.Errorf("expect tuple data type")
  45. }
  46. log.Debugf("preprocessor receive %s", tuple.Message)
  47. result := make(map[string]interface{})
  48. if p.streamStmt.StreamFields != nil {
  49. for _, f := range p.streamStmt.StreamFields {
  50. if e := p.addRecField(f.FieldType, result, tuple.Message, f.Name); e != nil {
  51. return fmt.Errorf("error in preprocessor: %s", e)
  52. }
  53. }
  54. } else {
  55. if p.isSelectAll {
  56. result = tuple.Message
  57. } else {
  58. result = xsql.LowercaseKeyMap(tuple.Message)
  59. }
  60. }
  61. //If the field has alias name, then evaluate the alias field before transfer it to proceeding operators, and put it into result.
  62. //Otherwise, the GROUP BY, ORDER BY statement cannot get the value.
  63. for _, f := range p.aliasFields {
  64. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, fv)}
  65. v := ve.Eval(f.Expr)
  66. if _, ok := v.(error); ok {
  67. return v
  68. } else {
  69. result[f.AName] = v
  70. }
  71. }
  72. tuple.Message = result
  73. if p.isEventTime {
  74. if t, ok := result[p.timestampField]; ok {
  75. if ts, err := common.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
  76. return fmt.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
  77. } else {
  78. tuple.Timestamp = ts
  79. log.Debugf("preprocessor calculate timstamp %d", tuple.Timestamp)
  80. }
  81. } else {
  82. return fmt.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
  83. }
  84. }
  85. return tuple
  86. }
  87. func (p *Preprocessor) parseTime(s string) (time.Time, error) {
  88. if f, ok := p.streamStmt.Options["TIMESTAMP_FORMAT"]; ok {
  89. return common.ParseTime(s, f)
  90. } else {
  91. return time.Parse(common.JSISO, s)
  92. }
  93. }
  94. func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{}, j xsql.Message, n string) error {
  95. if t, ok := j.Value(n); ok {
  96. v := reflect.ValueOf(t)
  97. jtype := v.Kind()
  98. switch st := ft.(type) {
  99. case *xsql.BasicType:
  100. switch st.Type {
  101. case xsql.UNKNOWN:
  102. return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
  103. case xsql.BIGINT:
  104. if jtype == reflect.Int {
  105. r[n] = t.(int)
  106. } else if jtype == reflect.Float64 {
  107. if tt, ok1 := t.(float64); ok1 {
  108. if tt > math.MaxInt64 {
  109. r[n] = uint64(tt)
  110. } else {
  111. r[n] = int(tt)
  112. }
  113. }
  114. } else if jtype == reflect.String {
  115. if i, err := strconv.Atoi(t.(string)); err != nil {
  116. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  117. } else {
  118. r[n] = i
  119. }
  120. } else if jtype == reflect.Uint64 {
  121. r[n] = t.(uint64)
  122. } else {
  123. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  124. }
  125. case xsql.FLOAT:
  126. if jtype == reflect.Float64 {
  127. r[n] = t.(float64)
  128. } else if jtype == reflect.String {
  129. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  130. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  131. } else {
  132. r[n] = f
  133. }
  134. } else {
  135. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  136. }
  137. case xsql.STRINGS:
  138. if jtype == reflect.String {
  139. r[n] = t.(string)
  140. } else {
  141. return fmt.Errorf("invalid data type for %s, expect string but found %[2]T(%[2]v)", n, t)
  142. }
  143. case xsql.DATETIME:
  144. switch jtype {
  145. case reflect.Int:
  146. ai := t.(int64)
  147. r[n] = common.TimeFromUnixMilli(ai)
  148. case reflect.Float64:
  149. ai := int64(t.(float64))
  150. r[n] = common.TimeFromUnixMilli(ai)
  151. case reflect.String:
  152. if t, err := p.parseTime(t.(string)); err != nil {
  153. return fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", n, err)
  154. } else {
  155. r[n] = t
  156. }
  157. default:
  158. return fmt.Errorf("invalid data type for %s, expect datatime but find %[2]T(%[2]v)", n, t)
  159. }
  160. case xsql.BOOLEAN:
  161. if jtype == reflect.Bool {
  162. r[n] = t.(bool)
  163. } else if jtype == reflect.String {
  164. if i, err := strconv.ParseBool(t.(string)); err != nil {
  165. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  166. } else {
  167. r[n] = i
  168. }
  169. } else {
  170. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  171. }
  172. default:
  173. return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
  174. }
  175. case *xsql.ArrayType:
  176. var s []interface{}
  177. if t == nil {
  178. s = nil
  179. } else if jtype == reflect.Slice {
  180. s = t.([]interface{})
  181. } else if jtype == reflect.String {
  182. err := json.Unmarshal([]byte(t.(string)), &s)
  183. if err != nil {
  184. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  185. }
  186. } else {
  187. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  188. }
  189. if tempArr, err := p.addArrayField(st, s); err != nil {
  190. return fmt.Errorf("fail to parse field %s: %s", n, err)
  191. } else {
  192. r[n] = tempArr
  193. }
  194. case *xsql.RecType:
  195. nextJ := make(map[string]interface{})
  196. if t == nil {
  197. nextJ = nil
  198. r[n] = nextJ
  199. return nil
  200. } else if jtype == reflect.Map {
  201. nextJ, ok = t.(map[string]interface{})
  202. if !ok {
  203. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  204. }
  205. } else if jtype == reflect.String {
  206. err := json.Unmarshal([]byte(t.(string)), &nextJ)
  207. if err != nil {
  208. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  209. }
  210. } else {
  211. return fmt.Errorf("invalid data type for %s, expect struct but found %[2]T(%[2]v)", n, t)
  212. }
  213. nextR := make(map[string]interface{})
  214. for _, nextF := range st.StreamFields {
  215. nextP := strings.ToLower(nextF.Name)
  216. if e := p.addRecField(nextF.FieldType, nextR, nextJ, nextP); e != nil {
  217. return e
  218. }
  219. }
  220. r[n] = nextR
  221. default:
  222. return fmt.Errorf("unsupported type %T", st)
  223. }
  224. return nil
  225. } else {
  226. return fmt.Errorf("invalid data %s, field %s not found", j, n)
  227. }
  228. }
  229. //ft must be xsql.ArrayType
  230. //side effect: r[p] will be set to the new array
  231. func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{}) (interface{}, error) {
  232. if ft.FieldType != nil { //complex type array or struct
  233. switch st := ft.FieldType.(type) { //Only two complex types supported here
  234. case *xsql.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
  235. if srcSlice == nil {
  236. return [][]interface{}(nil), nil
  237. }
  238. var s []interface{}
  239. var tempSlice reflect.Value
  240. for i, t := range srcSlice {
  241. jtype := reflect.ValueOf(t).Kind()
  242. if t == nil {
  243. s = nil
  244. } else if jtype == reflect.Slice || jtype == reflect.Array {
  245. s = t.([]interface{})
  246. } else if jtype == reflect.String {
  247. err := json.Unmarshal([]byte(t.(string)), &s)
  248. if err != nil {
  249. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  250. }
  251. } else {
  252. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  253. }
  254. if tempArr, err := p.addArrayField(st, s); err != nil {
  255. return nil, err
  256. } else {
  257. if !tempSlice.IsValid() {
  258. s := reflect.TypeOf(tempArr)
  259. tempSlice = reflect.MakeSlice(reflect.SliceOf(s), 0, 0)
  260. }
  261. tempSlice = reflect.Append(tempSlice, reflect.ValueOf(tempArr))
  262. }
  263. }
  264. return tempSlice.Interface(), nil
  265. case *xsql.RecType:
  266. if srcSlice == nil {
  267. return []map[string]interface{}(nil), nil
  268. }
  269. tempSlice := make([]map[string]interface{}, 0)
  270. for i, t := range srcSlice {
  271. jtype := reflect.ValueOf(t).Kind()
  272. j := make(map[string]interface{})
  273. var ok bool
  274. if t == nil {
  275. j = nil
  276. tempSlice = append(tempSlice, j)
  277. continue
  278. } else if jtype == reflect.Map {
  279. j, ok = t.(map[string]interface{})
  280. if !ok {
  281. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  282. }
  283. } else if jtype == reflect.String {
  284. err := json.Unmarshal([]byte(t.(string)), &j)
  285. if err != nil {
  286. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  287. }
  288. } else {
  289. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  290. }
  291. r := make(map[string]interface{})
  292. for _, f := range st.StreamFields {
  293. n := f.Name
  294. if e := p.addRecField(f.FieldType, r, j, n); e != nil {
  295. return nil, e
  296. }
  297. }
  298. tempSlice = append(tempSlice, r)
  299. }
  300. return tempSlice, nil
  301. default:
  302. return nil, fmt.Errorf("unsupported type %T", st)
  303. }
  304. } else { //basic type
  305. switch ft.Type {
  306. case xsql.UNKNOWN:
  307. return nil, fmt.Errorf("invalid data type unknown defined for %s, please checke the stream definition", srcSlice)
  308. case xsql.BIGINT:
  309. if srcSlice == nil {
  310. return []int(nil), nil
  311. }
  312. tempSlice := make([]int, 0)
  313. for i, t := range srcSlice {
  314. jtype := reflect.ValueOf(t).Kind()
  315. if jtype == reflect.Float64 {
  316. tempSlice = append(tempSlice, int(t.(float64)))
  317. } else if jtype == reflect.String {
  318. if v, err := strconv.Atoi(t.(string)); err != nil {
  319. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  320. } else {
  321. tempSlice = append(tempSlice, v)
  322. }
  323. } else {
  324. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  325. }
  326. }
  327. return tempSlice, nil
  328. case xsql.FLOAT:
  329. if srcSlice == nil {
  330. return []float64(nil), nil
  331. }
  332. tempSlice := make([]float64, 0)
  333. for i, t := range srcSlice {
  334. jtype := reflect.ValueOf(t).Kind()
  335. if jtype == reflect.Float64 {
  336. tempSlice = append(tempSlice, t.(float64))
  337. } else if jtype == reflect.String {
  338. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  339. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  340. } else {
  341. tempSlice = append(tempSlice, f)
  342. }
  343. } else {
  344. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  345. }
  346. }
  347. return tempSlice, nil
  348. case xsql.STRINGS:
  349. if srcSlice == nil {
  350. return []string(nil), nil
  351. }
  352. tempSlice := make([]string, 0)
  353. for i, t := range srcSlice {
  354. if reflect.ValueOf(t).Kind() == reflect.String {
  355. tempSlice = append(tempSlice, t.(string))
  356. } else {
  357. return nil, fmt.Errorf("invalid data type for [%d], expect string but found %[2]T(%[2]v)", i, t)
  358. }
  359. }
  360. return tempSlice, nil
  361. case xsql.DATETIME:
  362. if srcSlice == nil {
  363. return []time.Time(nil), nil
  364. }
  365. tempSlice := make([]time.Time, 0)
  366. for i, t := range srcSlice {
  367. jtype := reflect.ValueOf(t).Kind()
  368. switch jtype {
  369. case reflect.Int:
  370. ai := t.(int64)
  371. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  372. case reflect.Float64:
  373. ai := int64(t.(float64))
  374. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  375. case reflect.String:
  376. if ai, err := p.parseTime(t.(string)); err != nil {
  377. return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %[2]T(%[2]v)", t, err)
  378. } else {
  379. tempSlice = append(tempSlice, ai)
  380. }
  381. default:
  382. return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %[2]T(%[2]v)", i, t)
  383. }
  384. }
  385. return tempSlice, nil
  386. case xsql.BOOLEAN:
  387. if srcSlice == nil {
  388. return []bool(nil), nil
  389. }
  390. tempSlice := make([]bool, 0)
  391. for i, t := range srcSlice {
  392. jtype := reflect.ValueOf(t).Kind()
  393. if jtype == reflect.Bool {
  394. tempSlice = append(tempSlice, t.(bool))
  395. } else if jtype == reflect.String {
  396. if v, err := strconv.ParseBool(t.(string)); err != nil {
  397. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  398. } else {
  399. tempSlice = append(tempSlice, v)
  400. }
  401. } else {
  402. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  403. }
  404. }
  405. return tempSlice, nil
  406. default:
  407. return nil, fmt.Errorf("invalid data type for %T", ft.Type)
  408. }
  409. }
  410. }