preprocessor.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. package plans
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "reflect"
  9. "strconv"
  10. "strings"
  11. "time"
  12. )
  13. type Preprocessor struct {
  14. streamStmt *xsql.StreamStmt
  15. fields xsql.Fields
  16. isEventTime bool
  17. timestampField string
  18. timestampFormat string
  19. }
  20. func NewPreprocessor(s *xsql.StreamStmt, fs xsql.Fields, iet bool) (*Preprocessor, error) {
  21. p := &Preprocessor{streamStmt: s, fields: fs, isEventTime: iet}
  22. if iet {
  23. if tf, ok := s.Options["TIMESTAMP"]; ok {
  24. p.timestampField = tf
  25. } else {
  26. return nil, fmt.Errorf("preprocessor is set to be event time but stream option TIMESTAMP not found")
  27. }
  28. if ts, ok := s.Options["TIMESTAMP_FORMAT"]; ok {
  29. p.timestampFormat = ts
  30. }
  31. }
  32. return p, nil
  33. }
  34. /*
  35. * input: *xsql.Tuple
  36. * output: *xsql.Tuple
  37. */
  38. func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}) interface{} {
  39. log := ctx.GetLogger()
  40. tuple, ok := data.(*xsql.Tuple)
  41. if !ok {
  42. log.Errorf("Expect tuple data type")
  43. return nil
  44. }
  45. log.Debugf("preprocessor receive %s", tuple.Message)
  46. result := make(map[string]interface{})
  47. if p.streamStmt.StreamFields != nil {
  48. for _, f := range p.streamStmt.StreamFields {
  49. fname := strings.ToLower(f.Name)
  50. if e := p.addRecField(f.FieldType, result, tuple.Message, fname); e != nil {
  51. log.Errorf("error in preprocessor: %s", e)
  52. return nil
  53. }
  54. }
  55. } else {
  56. result = tuple.Message
  57. }
  58. //If the field has alias name, then evaluate the alias field before transfer it to proceeding operators, and put it into result.
  59. //Otherwise, the GROUP BY, ORDER BY statement cannot get the value.
  60. for _, f := range p.fields {
  61. if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)) {
  62. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, &xsql.FunctionValuer{})}
  63. if v := ve.Eval(f.Expr); v != nil {
  64. result[strings.ToLower(f.AName)] = v
  65. }
  66. }
  67. }
  68. tuple.Message = result
  69. if p.isEventTime {
  70. if t, ok := result[p.timestampField]; ok {
  71. if ts, err := common.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
  72. log.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
  73. return nil
  74. } else {
  75. tuple.Timestamp = ts
  76. log.Debugf("preprocessor calculate timstamp %d", tuple.Timestamp)
  77. }
  78. } else {
  79. log.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
  80. return nil
  81. }
  82. }
  83. return tuple
  84. }
  85. func (p *Preprocessor) parseTime(s string) (time.Time, error) {
  86. if f, ok := p.streamStmt.Options["TIMESTAMP_FORMAT"]; ok {
  87. return common.ParseTime(s, f)
  88. } else {
  89. return time.Parse(common.JSISO, s)
  90. }
  91. }
  92. func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{}, j map[string]interface{}, n string) error {
  93. if t, ok := j[n]; ok {
  94. v := reflect.ValueOf(t)
  95. jtype := v.Kind()
  96. switch st := ft.(type) {
  97. case *xsql.BasicType:
  98. switch st.Type {
  99. case xsql.UNKNOWN:
  100. return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
  101. case xsql.BIGINT:
  102. if jtype == reflect.Int {
  103. r[n] = t.(int)
  104. } else if jtype == reflect.Float64 {
  105. r[n] = int(t.(float64))
  106. } else if jtype == reflect.String {
  107. if i, err := strconv.Atoi(t.(string)); err != nil {
  108. return fmt.Errorf("invalid data type for %s, expect bigint but found %s", n, t)
  109. } else {
  110. r[n] = i
  111. }
  112. } else {
  113. return fmt.Errorf("invalid data type for %s, expect bigint but found %s", n, t)
  114. }
  115. case xsql.FLOAT:
  116. if jtype == reflect.Float64 {
  117. r[n] = t.(float64)
  118. } else if jtype == reflect.String {
  119. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  120. return fmt.Errorf("invalid data type for %s, expect float but found %s", n, t)
  121. } else {
  122. r[n] = f
  123. }
  124. } else {
  125. return fmt.Errorf("invalid data type for %s, expect float but found %s", n, t)
  126. }
  127. case xsql.STRINGS:
  128. if jtype == reflect.String {
  129. r[n] = t.(string)
  130. } else {
  131. return fmt.Errorf("invalid data type for %s, expect string but found %s", n, t)
  132. }
  133. case xsql.DATETIME:
  134. switch jtype {
  135. case reflect.Int:
  136. ai := t.(int64)
  137. r[n] = common.TimeFromUnixMilli(ai)
  138. case reflect.Float64:
  139. ai := int64(t.(float64))
  140. r[n] = common.TimeFromUnixMilli(ai)
  141. case reflect.String:
  142. if t, err := p.parseTime(t.(string)); err != nil {
  143. return fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", n, err)
  144. } else {
  145. r[n] = t
  146. }
  147. default:
  148. return fmt.Errorf("invalid data type for %s, expect datatime but find %v", n, t)
  149. }
  150. case xsql.BOOLEAN:
  151. if jtype == reflect.Bool {
  152. r[n] = t.(bool)
  153. } else if jtype == reflect.String {
  154. if i, err := strconv.ParseBool(t.(string)); err != nil {
  155. return fmt.Errorf("invalid data type for %s, expect boolean but found %s", n, t)
  156. } else {
  157. r[n] = i
  158. }
  159. } else {
  160. return fmt.Errorf("invalid data type for %s, expect boolean but found %s", n, t)
  161. }
  162. default:
  163. return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
  164. }
  165. case *xsql.ArrayType:
  166. var s []interface{}
  167. if jtype == reflect.Slice {
  168. s = t.([]interface{})
  169. } else if jtype == reflect.String {
  170. err := json.Unmarshal([]byte(t.(string)), &s)
  171. if err != nil {
  172. return fmt.Errorf("invalid data type for %s, expect array but found %s", n, t)
  173. }
  174. } else {
  175. return fmt.Errorf("invalid data type for %s, expect array but found %s", n, t)
  176. }
  177. if tempArr, err := p.addArrayField(st, s); err != nil {
  178. return err
  179. } else {
  180. r[n] = tempArr
  181. }
  182. case *xsql.RecType:
  183. nextJ := make(map[string]interface{})
  184. if jtype == reflect.Map {
  185. nextJ, ok = t.(map[string]interface{})
  186. if !ok {
  187. return fmt.Errorf("invalid data type for %s, expect map but found %s", n, t)
  188. }
  189. } else if jtype == reflect.String {
  190. err := json.Unmarshal([]byte(t.(string)), &nextJ)
  191. if err != nil {
  192. return fmt.Errorf("invalid data type for %s, expect map but found %s", n, t)
  193. }
  194. } else {
  195. return fmt.Errorf("invalid data type for %s, expect struct but found %s", n, t)
  196. }
  197. nextR := make(map[string]interface{})
  198. for _, nextF := range st.StreamFields {
  199. nextP := strings.ToLower(nextF.Name)
  200. if e := p.addRecField(nextF.FieldType, nextR, nextJ, nextP); e != nil {
  201. return e
  202. }
  203. }
  204. r[n] = nextR
  205. default:
  206. return fmt.Errorf("unsupported type %T", st)
  207. }
  208. return nil
  209. } else {
  210. return fmt.Errorf("invalid data %s, field %s not found", j, n)
  211. }
  212. }
  213. //ft must be xsql.ArrayType
  214. //side effect: r[p] will be set to the new array
  215. func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{}) (interface{}, error) {
  216. if ft.FieldType != nil { //complex type array or struct
  217. switch st := ft.FieldType.(type) { //Only two complex types supported here
  218. case *xsql.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
  219. var tempSlice [][]interface{}
  220. var s []interface{}
  221. for i, t := range srcSlice {
  222. jtype := reflect.ValueOf(t).Kind()
  223. if jtype == reflect.Slice || jtype == reflect.Array {
  224. s = t.([]interface{})
  225. } else if jtype == reflect.String {
  226. err := json.Unmarshal([]byte(t.(string)), &s)
  227. if err != nil {
  228. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %s", i, t)
  229. }
  230. } else {
  231. return nil, fmt.Errorf("invalid data type for [%d], expect array but found %s", i, t)
  232. }
  233. if tempArr, err := p.addArrayField(st, s); err != nil {
  234. return nil, err
  235. } else {
  236. tempSlice = append(tempSlice, tempArr.([]interface{}))
  237. }
  238. }
  239. return tempSlice, nil
  240. case *xsql.RecType:
  241. var tempSlice []map[string]interface{}
  242. for i, t := range srcSlice {
  243. jtype := reflect.ValueOf(t).Kind()
  244. j := make(map[string]interface{})
  245. var ok bool
  246. if jtype == reflect.Map {
  247. j, ok = t.(map[string]interface{})
  248. if !ok {
  249. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %s", i, t)
  250. }
  251. } else if jtype == reflect.String {
  252. err := json.Unmarshal([]byte(t.(string)), &j)
  253. if err != nil {
  254. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %s", i, t)
  255. }
  256. } else {
  257. return nil, fmt.Errorf("invalid data type for [%d], expect map but found %s", i, t)
  258. }
  259. r := make(map[string]interface{})
  260. for _, f := range st.StreamFields {
  261. n := f.Name
  262. if e := p.addRecField(f.FieldType, r, j, n); e != nil {
  263. return nil, e
  264. }
  265. }
  266. tempSlice = append(tempSlice, r)
  267. }
  268. return tempSlice, nil
  269. default:
  270. return nil, fmt.Errorf("unsupported type %T", st)
  271. }
  272. } else { //basic type
  273. switch ft.Type {
  274. case xsql.UNKNOWN:
  275. return nil, fmt.Errorf("invalid data type unknown defined for %s, please checke the stream definition", srcSlice)
  276. case xsql.BIGINT:
  277. var tempSlice []int
  278. for i, t := range srcSlice {
  279. jtype := reflect.ValueOf(t).Kind()
  280. if jtype == reflect.Float64 {
  281. tempSlice = append(tempSlice, int(t.(float64)))
  282. } else if jtype == reflect.String {
  283. if v, err := strconv.Atoi(t.(string)); err != nil {
  284. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
  285. } else {
  286. tempSlice = append(tempSlice, v)
  287. }
  288. } else {
  289. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
  290. }
  291. }
  292. return tempSlice, nil
  293. case xsql.FLOAT:
  294. var tempSlice []float64
  295. for i, t := range srcSlice {
  296. jtype := reflect.ValueOf(t).Kind()
  297. if jtype == reflect.Float64 {
  298. tempSlice = append(tempSlice, t.(float64))
  299. } else if jtype == reflect.String {
  300. if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
  301. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
  302. } else {
  303. tempSlice = append(tempSlice, f)
  304. }
  305. } else {
  306. return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
  307. }
  308. }
  309. return tempSlice, nil
  310. case xsql.STRINGS:
  311. var tempSlice []string
  312. for i, t := range srcSlice {
  313. if reflect.ValueOf(t).Kind() == reflect.String {
  314. tempSlice = append(tempSlice, t.(string))
  315. } else {
  316. return nil, fmt.Errorf("invalid data type for [%d], expect string but found %s", i, t)
  317. }
  318. }
  319. return tempSlice, nil
  320. case xsql.DATETIME:
  321. var tempSlice []time.Time
  322. for i, t := range srcSlice {
  323. jtype := reflect.ValueOf(t).Kind()
  324. switch jtype {
  325. case reflect.Int:
  326. ai := t.(int64)
  327. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  328. case reflect.Float64:
  329. ai := int64(t.(float64))
  330. tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
  331. case reflect.String:
  332. if ai, err := p.parseTime(t.(string)); err != nil {
  333. return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", t, err)
  334. } else {
  335. tempSlice = append(tempSlice, ai)
  336. }
  337. default:
  338. return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %v", i, t)
  339. }
  340. }
  341. return tempSlice, nil
  342. case xsql.BOOLEAN:
  343. var tempSlice []bool
  344. for i, t := range srcSlice {
  345. jtype := reflect.ValueOf(t).Kind()
  346. if jtype == reflect.Bool {
  347. tempSlice = append(tempSlice, t.(bool))
  348. } else if jtype == reflect.String {
  349. if v, err := strconv.ParseBool(t.(string)); err != nil {
  350. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %s", i, t)
  351. } else {
  352. tempSlice = append(tempSlice, v)
  353. }
  354. } else {
  355. return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %s", i, t)
  356. }
  357. }
  358. return tempSlice, nil
  359. default:
  360. return nil, fmt.Errorf("invalid data type for %T, datetime type is not supported yet", ft.Type)
  361. }
  362. }
  363. }