preprocessor.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. package plans
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "reflect"
  9. "strconv"
  10. "strings"
  11. "time"
  12. )
  13. type Preprocessor struct {
  14. streamStmt *xsql.StreamStmt
  15. fields xsql.Fields
  16. isEventTime bool
  17. timestampField string
  18. timestampFormat string
  19. }
  20. func NewPreprocessor(s *xsql.StreamStmt, fs xsql.Fields, iet bool) (*Preprocessor, error) {
  21. p := &Preprocessor{streamStmt: s, fields: fs, isEventTime: iet}
  22. if iet {
  23. if tf, ok := s.Options["TIMESTAMP"]; ok {
  24. p.timestampField = tf
  25. } else {
  26. return nil, fmt.Errorf("preprocessor is set to be event time but stream option TIMESTAMP not found")
  27. }
  28. if ts, ok := s.Options["TIMESTAMP_FORMAT"]; ok {
  29. p.timestampFormat = ts
  30. }
  31. }
  32. return p, nil
  33. }
  34. /*
  35. * input: *xsql.Tuple
  36. * output: *xsql.Tuple
  37. */
  38. func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}) interface{} {
  39. log := ctx.GetLogger()
  40. tuple, ok := data.(*xsql.Tuple)
  41. if !ok {
  42. return fmt.Errorf("expect tuple data type")
  43. }
  44. log.Debugf("preprocessor receive %s", tuple.Message)
  45. result := make(map[string]interface{})
  46. if p.streamStmt.StreamFields != nil {
  47. for _, f := range p.streamStmt.StreamFields {
  48. fname := strings.ToLower(f.Name)
  49. if e := p.addRecField(f.FieldType, result, tuple.Message, fname); e != nil {
  50. return fmt.Errorf("error in preprocessor: %s", e)
  51. }
  52. }
  53. } else {
  54. result = tuple.Message
  55. }
  56. //If the field has alias name, then evaluate the alias field before transfer it to proceeding operators, and put it into result.
  57. //Otherwise, the GROUP BY, ORDER BY statement cannot get the value.
  58. for _, f := range p.fields {
  59. if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)) {
  60. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, &xsql.FunctionValuer{})}
  61. v := ve.Eval(f.Expr)
  62. if _, ok := v.(error); ok {
  63. return v
  64. } else {
  65. result[strings.ToLower(f.AName)] = v
  66. }
  67. }
  68. }
  69. tuple.Message = result
  70. if p.isEventTime {
  71. if t, ok := result[p.timestampField]; ok {
  72. if ts, err := common.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
  73. return fmt.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
  74. } else {
  75. tuple.Timestamp = ts
  76. log.Debugf("preprocessor calculate timstamp %d", tuple.Timestamp)
  77. }
  78. } else {
  79. return fmt.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
  80. }
  81. }
  82. return tuple
  83. }
  84. func (p *Preprocessor) parseTime(s string) (time.Time, error) {
  85. if f, ok := p.streamStmt.Options["TIMESTAMP_FORMAT"]; ok {
  86. return common.ParseTime(s, f)
  87. } else {
  88. return time.Parse(common.JSISO, s)
  89. }
  90. }
  91. func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{}, j map[string]interface{}, n string) error {
  92. if t, ok := j[n]; ok {
  93. v := reflect.ValueOf(t)
  94. jtype := v.Kind()
  95. switch st := ft.(type) {
  96. case *xsql.BasicType:
  97. switch st.Type {
  98. case xsql.UNKNOWN:
  99. return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
  100. case xsql.BIGINT:
  101. if jtype == reflect.Int {
  102. r[n] = t.(int)
  103. } else if jtype == reflect.Float64 {
  104. r[n] = int(t.(float64))
  105. } else if jtype == reflect.String {
  106. if i, err := strconv.Atoi(t.(string)); err != nil {
  107. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  108. } else {
  109. r[n] = i
  110. }
  111. } else {
  112. return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
  113. }
  114. case xsql.FLOAT:
  115. if jtype == reflect.Float64 {
  116. r[n] = t.(float64)
  117. } else if jtype == reflect.String {
  118. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  119. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  120. } else {
  121. r[n] = f
  122. }
  123. } else {
  124. return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
  125. }
  126. case xsql.STRINGS:
  127. if jtype == reflect.String {
  128. r[n] = t.(string)
  129. } else {
  130. return fmt.Errorf("invalid data type for %s, expect string but found %[2]T(%[2]v)", n, t)
  131. }
  132. case xsql.DATETIME:
  133. switch jtype {
  134. case reflect.Int:
  135. ai := t.(int64)
  136. r[n] = common.TimeFromUnixMilli(ai)
  137. case reflect.Float64:
  138. ai := int64(t.(float64))
  139. r[n] = common.TimeFromUnixMilli(ai)
  140. case reflect.String:
  141. if t, err := p.parseTime(t.(string)); err != nil {
  142. return fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", n, err)
  143. } else {
  144. r[n] = t
  145. }
  146. default:
  147. return fmt.Errorf("invalid data type for %s, expect datatime but find %[2]T(%[2]v)", n, t)
  148. }
  149. case xsql.BOOLEAN:
  150. if jtype == reflect.Bool {
  151. r[n] = t.(bool)
  152. } else if jtype == reflect.String {
  153. if i, err := strconv.ParseBool(t.(string)); err != nil {
  154. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  155. } else {
  156. r[n] = i
  157. }
  158. } else {
  159. return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
  160. }
  161. default:
  162. return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
  163. }
  164. case *xsql.ArrayType:
  165. var s []interface{}
  166. if t == nil {
  167. s = nil
  168. } else if jtype == reflect.Slice {
  169. s = t.([]interface{})
  170. } else if jtype == reflect.String {
  171. err := json.Unmarshal([]byte(t.(string)), &s)
  172. if err != nil {
  173. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  174. }
  175. } else {
  176. return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
  177. }
  178. if tempArr, err := p.addArrayField(st, s); err != nil {
  179. return fmt.Errorf("fail to parse field %s: %s", n, err)
  180. } else {
  181. r[n] = tempArr
  182. }
  183. case *xsql.RecType:
  184. nextJ := make(map[string]interface{})
  185. if t == nil {
  186. nextJ = nil
  187. r[n] = nextJ
  188. return nil
  189. } else if jtype == reflect.Map {
  190. nextJ, ok = t.(map[string]interface{})
  191. if !ok {
  192. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  193. }
  194. } else if jtype == reflect.String {
  195. err := json.Unmarshal([]byte(t.(string)), &nextJ)
  196. if err != nil {
  197. return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
  198. }
  199. } else {
  200. return fmt.Errorf("invalid data type for %s, expect struct but found %[2]T(%[2]v)", n, t)
  201. }
  202. nextR := make(map[string]interface{})
  203. for _, nextF := range st.StreamFields {
  204. nextP := strings.ToLower(nextF.Name)
  205. if e := p.addRecField(nextF.FieldType, nextR, nextJ, nextP); e != nil {
  206. return e
  207. }
  208. }
  209. r[n] = nextR
  210. default:
  211. return fmt.Errorf("unsupported type %T", st)
  212. }
  213. return nil
  214. } else {
  215. return fmt.Errorf("invalid data %s, field %s not found", j, n)
  216. }
  217. }
  218. //ft must be xsql.ArrayType
  219. //side effect: r[p] will be set to the new array
  220. func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{}) (interface{}, error) {
  221. if ft.FieldType != nil { //complex type array or struct
  222. switch st := ft.FieldType.(type) { //Only two complex types supported here
  223. case *xsql.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
  224. if srcSlice == nil {
  225. return [][]interface{}(nil), nil
  226. }
  227. var s []interface{}
  228. var tempSlice reflect.Value
  229. for i, t := range srcSlice {
  230. jtype := reflect.ValueOf(t).Kind()
  231. if t == nil {
  232. s = nil
  233. } else if jtype == reflect.Slice || jtype == reflect.Array {
  234. s = t.([]interface{})
  235. } else if jtype == reflect.String {
  236. err := json.Unmarshal([]byte(t.(string)), &s)
  237. if err != nil {
  238. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  239. }
  240. } else {
  241. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
  242. }
  243. if tempArr, err := p.addArrayField(st, s); err != nil {
  244. return nil, err
  245. } else {
  246. if !tempSlice.IsValid() {
  247. s := reflect.TypeOf(tempArr)
  248. tempSlice = reflect.MakeSlice(reflect.SliceOf(s), 0, 0)
  249. }
  250. tempSlice = reflect.Append(tempSlice, reflect.ValueOf(tempArr))
  251. }
  252. }
  253. return tempSlice.Interface(), nil
  254. case *xsql.RecType:
  255. if srcSlice == nil {
  256. return []map[string]interface{}(nil), nil
  257. }
  258. tempSlice := make([]map[string]interface{}, 0)
  259. for i, t := range srcSlice {
  260. jtype := reflect.ValueOf(t).Kind()
  261. j := make(map[string]interface{})
  262. var ok bool
  263. if t == nil {
  264. j = nil
  265. tempSlice = append(tempSlice, j)
  266. continue
  267. } else if jtype == reflect.Map {
  268. j, ok = t.(map[string]interface{})
  269. if !ok {
  270. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  271. }
  272. } else if jtype == reflect.String {
  273. err := json.Unmarshal([]byte(t.(string)), &j)
  274. if err != nil {
  275. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  276. }
  277. } else {
  278. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
  279. }
  280. r := make(map[string]interface{})
  281. for _, f := range st.StreamFields {
  282. n := f.Name
  283. if e := p.addRecField(f.FieldType, r, j, n); e != nil {
  284. return nil, e
  285. }
  286. }
  287. tempSlice = append(tempSlice, r)
  288. }
  289. return tempSlice, nil
  290. default:
  291. return nil, fmt.Errorf("unsupported type %T", st)
  292. }
  293. } else { //basic type
  294. switch ft.Type {
  295. case xsql.UNKNOWN:
  296. return nil, fmt.Errorf("invalid data type unknown defined for %s, please checke the stream definition", srcSlice)
  297. case xsql.BIGINT:
  298. if srcSlice == nil {
  299. return []int(nil), nil
  300. }
  301. tempSlice := make([]int, 0)
  302. for i, t := range srcSlice {
  303. jtype := reflect.ValueOf(t).Kind()
  304. if jtype == reflect.Float64 {
  305. tempSlice = append(tempSlice, int(t.(float64)))
  306. } else if jtype == reflect.String {
  307. if v, err := strconv.Atoi(t.(string)); err != nil {
  308. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  309. } else {
  310. tempSlice = append(tempSlice, v)
  311. }
  312. } else {
  313. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  314. }
  315. }
  316. return tempSlice, nil
  317. case xsql.FLOAT:
  318. if srcSlice == nil {
  319. return []float64(nil), nil
  320. }
  321. tempSlice := make([]float64, 0)
  322. for i, t := range srcSlice {
  323. jtype := reflect.ValueOf(t).Kind()
  324. if jtype == reflect.Float64 {
  325. tempSlice = append(tempSlice, t.(float64))
  326. } else if jtype == reflect.String {
  327. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  328. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  329. } else {
  330. tempSlice = append(tempSlice, f)
  331. }
  332. } else {
  333. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
  334. }
  335. }
  336. return tempSlice, nil
  337. case xsql.STRINGS:
  338. if srcSlice == nil {
  339. return []string(nil), nil
  340. }
  341. tempSlice := make([]string, 0)
  342. for i, t := range srcSlice {
  343. if reflect.ValueOf(t).Kind() == reflect.String {
  344. tempSlice = append(tempSlice, t.(string))
  345. } else {
  346. return nil, fmt.Errorf("invalid data type for [%d], expect string but found %[2]T(%[2]v)", i, t)
  347. }
  348. }
  349. return tempSlice, nil
  350. case xsql.DATETIME:
  351. if srcSlice == nil {
  352. return []time.Time(nil), nil
  353. }
  354. tempSlice := make([]time.Time, 0)
  355. for i, t := range srcSlice {
  356. jtype := reflect.ValueOf(t).Kind()
  357. switch jtype {
  358. case reflect.Int:
  359. ai := t.(int64)
  360. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  361. case reflect.Float64:
  362. ai := int64(t.(float64))
  363. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  364. case reflect.String:
  365. if ai, err := p.parseTime(t.(string)); err != nil {
  366. return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %[2]T(%[2]v)", t, err)
  367. } else {
  368. tempSlice = append(tempSlice, ai)
  369. }
  370. default:
  371. return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %[2]T(%[2]v)", i, t)
  372. }
  373. }
  374. return tempSlice, nil
  375. case xsql.BOOLEAN:
  376. if srcSlice == nil {
  377. return []bool(nil), nil
  378. }
  379. tempSlice := make([]bool, 0)
  380. for i, t := range srcSlice {
  381. jtype := reflect.ValueOf(t).Kind()
  382. if jtype == reflect.Bool {
  383. tempSlice = append(tempSlice, t.(bool))
  384. } else if jtype == reflect.String {
  385. if v, err := strconv.ParseBool(t.(string)); err != nil {
  386. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  387. } else {
  388. tempSlice = append(tempSlice, v)
  389. }
  390. } else {
  391. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
  392. }
  393. }
  394. return tempSlice, nil
  395. default:
  396. return nil, fmt.Errorf("invalid data type for %T", ft.Type)
  397. }
  398. }
  399. }