preprocessor.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. package plans
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "reflect"
  9. "strconv"
  10. "strings"
  11. "time"
  12. )
  13. type Preprocessor struct {
  14. streamStmt *xsql.StreamStmt
  15. fields xsql.Fields
  16. isEventTime bool
  17. timestampField string
  18. timestampFormat string
  19. }
  20. func NewPreprocessor(s *xsql.StreamStmt, fs xsql.Fields, iet bool) (*Preprocessor, error) {
  21. p := &Preprocessor{streamStmt: s, fields: fs, isEventTime: iet}
  22. if iet {
  23. if tf, ok := s.Options["TIMESTAMP"]; ok {
  24. p.timestampField = tf
  25. } else {
  26. return nil, fmt.Errorf("preprocessor is set to be event time but stream option TIMESTAMP not found")
  27. }
  28. if ts, ok := s.Options["TIMESTAMP_FORMAT"]; ok {
  29. p.timestampFormat = ts
  30. }
  31. }
  32. return p, nil
  33. }
  34. /*
  35. * input: *xsql.Tuple
  36. * output: *xsql.Tuple
  37. */
  38. func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}) interface{} {
  39. log := ctx.GetLogger()
  40. tuple, ok := data.(*xsql.Tuple)
  41. if !ok {
  42. log.Errorf("Expect tuple data type")
  43. return nil
  44. }
  45. log.Debugf("preprocessor receive %s", tuple.Message)
  46. result := make(map[string]interface{})
  47. for _, f := range p.streamStmt.StreamFields {
  48. fname := strings.ToLower(f.Name)
  49. if e := p.addRecField(f.FieldType, result, tuple.Message, fname); e != nil {
  50. log.Errorf("error in preprocessor: %s", e)
  51. return nil
  52. }
  53. }
  54. //If the field has alias name, then evaluate the alias field before transfer it to proceeding operators, and put it into result.
  55. //Otherwise, the GROUP BY, ORDER BY statement cannot get the value.
  56. for _, f := range p.fields {
  57. if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)) {
  58. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, &xsql.FunctionValuer{})}
  59. if v := ve.Eval(f.Expr); v != nil {
  60. result[strings.ToLower(f.AName)] = v
  61. }
  62. }
  63. }
  64. tuple.Message = result
  65. if p.isEventTime {
  66. if t, ok := result[p.timestampField]; ok {
  67. if ts, err := common.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
  68. log.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
  69. return nil
  70. } else {
  71. tuple.Timestamp = ts
  72. log.Debugf("preprocessor calculate timstamp %d", tuple.Timestamp)
  73. }
  74. } else {
  75. log.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
  76. return nil
  77. }
  78. }
  79. return tuple
  80. }
  81. func (p *Preprocessor) parseTime(s string) (time.Time, error) {
  82. if f, ok := p.streamStmt.Options["TIMESTAMP_FORMAT"]; ok {
  83. return common.ParseTime(s, f)
  84. } else {
  85. return time.Parse(common.JSISO, s)
  86. }
  87. }
  88. func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{}, j map[string]interface{}, n string) error {
  89. if t, ok := j[n]; ok {
  90. v := reflect.ValueOf(t)
  91. jtype := v.Kind()
  92. switch st := ft.(type) {
  93. case *xsql.BasicType:
  94. switch st.Type {
  95. case xsql.UNKNOWN:
  96. return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
  97. case xsql.BIGINT:
  98. if jtype == reflect.Int {
  99. r[n] = t.(int)
  100. } else if jtype == reflect.Float64 {
  101. r[n] = int(t.(float64))
  102. } else if jtype == reflect.String {
  103. if i, err := strconv.Atoi(t.(string)); err != nil {
  104. return fmt.Errorf("invalid data type for %s, expect bigint but found %s", n, t)
  105. } else {
  106. r[n] = i
  107. }
  108. } else {
  109. return fmt.Errorf("invalid data type for %s, expect bigint but found %s", n, t)
  110. }
  111. case xsql.FLOAT:
  112. if jtype == reflect.Float64 {
  113. r[n] = t.(float64)
  114. } else if jtype == reflect.String {
  115. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  116. return fmt.Errorf("invalid data type for %s, expect float but found %s", n, t)
  117. } else {
  118. r[n] = f
  119. }
  120. } else {
  121. return fmt.Errorf("invalid data type for %s, expect float but found %s", n, t)
  122. }
  123. case xsql.STRINGS:
  124. if jtype == reflect.String {
  125. r[n] = t.(string)
  126. } else {
  127. return fmt.Errorf("invalid data type for %s, expect string but found %s", n, t)
  128. }
  129. case xsql.DATETIME:
  130. switch jtype {
  131. case reflect.Int:
  132. ai := t.(int64)
  133. r[n] = common.TimeFromUnixMilli(ai)
  134. case reflect.Float64:
  135. ai := int64(t.(float64))
  136. r[n] = common.TimeFromUnixMilli(ai)
  137. case reflect.String:
  138. if t, err := p.parseTime(t.(string)); err != nil {
  139. return fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", n, err)
  140. } else {
  141. r[n] = t
  142. }
  143. default:
  144. return fmt.Errorf("invalid data type for %s, expect datatime but find %v", n, t)
  145. }
  146. case xsql.BOOLEAN:
  147. if jtype == reflect.Bool {
  148. r[n] = t.(bool)
  149. } else if jtype == reflect.String {
  150. if i, err := strconv.ParseBool(t.(string)); err != nil {
  151. return fmt.Errorf("invalid data type for %s, expect boolean but found %s", n, t)
  152. } else {
  153. r[n] = i
  154. }
  155. } else {
  156. return fmt.Errorf("invalid data type for %s, expect boolean but found %s", n, t)
  157. }
  158. default:
  159. return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
  160. }
  161. case *xsql.ArrayType:
  162. var s []interface{}
  163. if jtype == reflect.Slice {
  164. s = t.([]interface{})
  165. } else if jtype == reflect.String {
  166. err := json.Unmarshal([]byte(t.(string)), &s)
  167. if err != nil {
  168. return fmt.Errorf("invalid data type for %s, expect array but found %s", n, t)
  169. }
  170. } else {
  171. return fmt.Errorf("invalid data type for %s, expect array but found %s", n, t)
  172. }
  173. if tempArr, err := p.addArrayField(st, s); err != nil {
  174. return err
  175. } else {
  176. r[n] = tempArr
  177. }
  178. case *xsql.RecType:
  179. nextJ := make(map[string]interface{})
  180. if jtype == reflect.Map {
  181. nextJ, ok = t.(map[string]interface{})
  182. if !ok {
  183. return fmt.Errorf("invalid data type for %s, expect map but found %s", n, t)
  184. }
  185. } else if jtype == reflect.String {
  186. err := json.Unmarshal([]byte(t.(string)), &nextJ)
  187. if err != nil {
  188. return fmt.Errorf("invalid data type for %s, expect map but found %s", n, t)
  189. }
  190. } else {
  191. return fmt.Errorf("invalid data type for %s, expect struct but found %s", n, t)
  192. }
  193. nextR := make(map[string]interface{})
  194. for _, nextF := range st.StreamFields {
  195. nextP := strings.ToLower(nextF.Name)
  196. if e := p.addRecField(nextF.FieldType, nextR, nextJ, nextP); e != nil {
  197. return e
  198. }
  199. }
  200. r[n] = nextR
  201. default:
  202. return fmt.Errorf("unsupported type %T", st)
  203. }
  204. return nil
  205. } else {
  206. return fmt.Errorf("invalid data %s, field %s not found", j, n)
  207. }
  208. }
  209. //ft must be xsql.ArrayType
  210. //side effect: r[p] will be set to the new array
  211. func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{}) (interface{}, error) {
  212. if ft.FieldType != nil { //complex type array or struct
  213. switch st := ft.FieldType.(type) { //Only two complex types supported here
  214. case *xsql.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
  215. var tempSlice [][]interface{}
  216. var s []interface{}
  217. for i, t := range srcSlice {
  218. jtype := reflect.ValueOf(t).Kind()
  219. if jtype == reflect.Slice || jtype == reflect.Array {
  220. s = t.([]interface{})
  221. } else if jtype == reflect.String {
  222. err := json.Unmarshal([]byte(t.(string)), &s)
  223. if err != nil {
  224. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %s", i, t)
  225. }
  226. } else {
  227. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %s", i, t)
  228. }
  229. if tempArr, err := p.addArrayField(st, s); err != nil {
  230. return nil, err
  231. } else {
  232. tempSlice = append(tempSlice, tempArr.([]interface{}))
  233. }
  234. }
  235. return tempSlice, nil
  236. case *xsql.RecType:
  237. var tempSlice []map[string]interface{}
  238. for i, t := range srcSlice {
  239. jtype := reflect.ValueOf(t).Kind()
  240. j := make(map[string]interface{})
  241. var ok bool
  242. if jtype == reflect.Map {
  243. j, ok = t.(map[string]interface{})
  244. if !ok {
  245. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %s", i, t)
  246. }
  247. } else if jtype == reflect.String {
  248. err := json.Unmarshal([]byte(t.(string)), &j)
  249. if err != nil {
  250. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %s", i, t)
  251. }
  252. } else {
  253. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %s", i, t)
  254. }
  255. r := make(map[string]interface{})
  256. for _, f := range st.StreamFields {
  257. n := f.Name
  258. if e := p.addRecField(f.FieldType, r, j, n); e != nil {
  259. return nil, e
  260. }
  261. }
  262. tempSlice = append(tempSlice, r)
  263. }
  264. return tempSlice, nil
  265. default:
  266. return nil, fmt.Errorf("unsupported type %T", st)
  267. }
  268. } else { //basic type
  269. switch ft.Type {
  270. case xsql.UNKNOWN:
  271. return nil, fmt.Errorf("invalid data type unknown defined for %s, please checke the stream definition", srcSlice)
  272. case xsql.BIGINT:
  273. var tempSlice []int
  274. for i, t := range srcSlice {
  275. jtype := reflect.ValueOf(t).Kind()
  276. if jtype == reflect.Float64 {
  277. tempSlice = append(tempSlice, int(t.(float64)))
  278. } else if jtype == reflect.String {
  279. if v, err := strconv.Atoi(t.(string)); err != nil {
  280. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
  281. } else {
  282. tempSlice = append(tempSlice, v)
  283. }
  284. } else {
  285. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
  286. }
  287. }
  288. return tempSlice, nil
  289. case xsql.FLOAT:
  290. var tempSlice []float64
  291. for i, t := range srcSlice {
  292. jtype := reflect.ValueOf(t).Kind()
  293. if jtype == reflect.Float64 {
  294. tempSlice = append(tempSlice, t.(float64))
  295. } else if jtype == reflect.String {
  296. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  297. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
  298. } else {
  299. tempSlice = append(tempSlice, f)
  300. }
  301. } else {
  302. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
  303. }
  304. }
  305. return tempSlice, nil
  306. case xsql.STRINGS:
  307. var tempSlice []string
  308. for i, t := range srcSlice {
  309. if reflect.ValueOf(t).Kind() == reflect.String {
  310. tempSlice = append(tempSlice, t.(string))
  311. } else {
  312. return nil, fmt.Errorf("invalid data type for [%d], expect string but found %s", i, t)
  313. }
  314. }
  315. return tempSlice, nil
  316. case xsql.DATETIME:
  317. var tempSlice []time.Time
  318. for i, t := range srcSlice {
  319. jtype := reflect.ValueOf(t).Kind()
  320. switch jtype {
  321. case reflect.Int:
  322. ai := t.(int64)
  323. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  324. case reflect.Float64:
  325. ai := int64(t.(float64))
  326. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  327. case reflect.String:
  328. if ai, err := p.parseTime(t.(string)); err != nil {
  329. return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", t, err)
  330. } else {
  331. tempSlice = append(tempSlice, ai)
  332. }
  333. default:
  334. return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %v", i, t)
  335. }
  336. }
  337. return tempSlice, nil
  338. case xsql.BOOLEAN:
  339. var tempSlice []bool
  340. for i, t := range srcSlice {
  341. jtype := reflect.ValueOf(t).Kind()
  342. if jtype == reflect.Bool {
  343. tempSlice = append(tempSlice, t.(bool))
  344. } else if jtype == reflect.String {
  345. if v, err := strconv.ParseBool(t.(string)); err != nil {
  346. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %s", i, t)
  347. } else {
  348. tempSlice = append(tempSlice, v)
  349. }
  350. } else {
  351. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %s", i, t)
  352. }
  353. }
  354. return tempSlice, nil
  355. default:
  356. return nil, fmt.Errorf("invalid data type for %T, datetime type is not supported yet", ft.Type)
  357. }
  358. }
  359. }