dataSourcePlan.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. package planner
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/internal/conf"
  5. "github.com/emqx/kuiper/pkg/ast"
  6. "github.com/emqx/kuiper/pkg/message"
  7. "sort"
  8. "strings"
  9. )
  10. type DataSourcePlan struct {
  11. baseLogicalPlan
  12. name ast.StreamName
  13. // calculated properties
  14. // initialized with stream definition, pruned with rule
  15. streamFields []interface{}
  16. metaFields []string
  17. // passon properties
  18. streamStmt *ast.StreamStmt
  19. allMeta bool
  20. isBinary bool
  21. iet bool
  22. timestampFormat string
  23. timestampField string
  24. // intermediate status
  25. isWildCard bool
  26. fields map[string]interface{}
  27. metaMap map[string]string
  28. }
  29. func (p DataSourcePlan) Init() *DataSourcePlan {
  30. p.baseLogicalPlan.self = &p
  31. return &p
  32. }
  33. // Presume no children for data source
  34. func (p *DataSourcePlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPlan) {
  35. owned, other := p.extract(condition)
  36. if owned != nil {
  37. // Add a filter plan for children
  38. f := FilterPlan{
  39. condition: owned,
  40. }.Init()
  41. f.SetChildren([]LogicalPlan{p})
  42. return other, f
  43. }
  44. return other, p
  45. }
  46. func (p *DataSourcePlan) extract(expr ast.Expr) (ast.Expr, ast.Expr) {
  47. s, hasDefault := getRefSources(expr)
  48. l := len(s)
  49. if hasDefault {
  50. l += 1
  51. }
  52. switch len(s) {
  53. case 0:
  54. return expr, nil
  55. case 1:
  56. if s[0] == p.name || s[0] == ast.DefaultStream {
  57. return expr, nil
  58. } else {
  59. return nil, expr
  60. }
  61. default:
  62. if be, ok := expr.(*ast.BinaryExpr); ok && be.OP == ast.AND {
  63. ul, pl := p.extract(be.LHS)
  64. ur, pr := p.extract(be.RHS)
  65. owned := combine(ul, ur)
  66. other := combine(pl, pr)
  67. return owned, other
  68. }
  69. return nil, expr
  70. }
  71. }
  72. func (p *DataSourcePlan) PruneColumns(fields []ast.Expr) error {
  73. //init values
  74. p.getProps()
  75. p.fields = make(map[string]interface{})
  76. if !p.allMeta {
  77. p.metaMap = make(map[string]string)
  78. }
  79. if p.timestampField != "" {
  80. p.fields[p.timestampField] = p.timestampField
  81. }
  82. for _, field := range fields {
  83. switch f := field.(type) {
  84. case *ast.Wildcard:
  85. p.isWildCard = true
  86. case *ast.FieldRef:
  87. if !p.isWildCard && (f.StreamName == ast.DefaultStream || f.StreamName == p.name) {
  88. if _, ok := p.fields[f.Name]; !ok {
  89. sf := p.getField(f.Name)
  90. if sf != nil {
  91. p.fields[f.Name] = sf
  92. }
  93. }
  94. }
  95. case *ast.MetaRef:
  96. if p.allMeta {
  97. break
  98. }
  99. if f.StreamName == ast.DefaultStream || f.StreamName == p.name {
  100. if f.Name == "*" {
  101. p.allMeta = true
  102. p.metaMap = nil
  103. } else if !p.allMeta {
  104. p.metaMap[strings.ToLower(f.Name)] = f.Name
  105. }
  106. }
  107. case *ast.SortField:
  108. if !p.isWildCard {
  109. sf := p.getField(f.Name)
  110. if sf != nil {
  111. p.fields[f.Name] = sf
  112. }
  113. }
  114. default:
  115. return fmt.Errorf("unsupported field %v", field)
  116. }
  117. }
  118. p.getAllFields()
  119. return nil
  120. }
  121. func (p *DataSourcePlan) getField(name string) interface{} {
  122. if p.streamStmt.StreamFields != nil {
  123. for _, f := range p.streamStmt.StreamFields { // The input can only be StreamFields
  124. if f.Name == name {
  125. return &f
  126. }
  127. }
  128. } else {
  129. return name
  130. }
  131. return nil
  132. }
  133. func (p *DataSourcePlan) getAllFields() {
  134. // convert fields
  135. p.streamFields = make([]interface{}, 0)
  136. if p.isWildCard {
  137. if p.streamStmt.StreamFields != nil {
  138. for k, _ := range p.streamStmt.StreamFields { // The input can only be StreamFields
  139. p.streamFields = append(p.streamFields, &p.streamStmt.StreamFields[k])
  140. }
  141. } else {
  142. p.streamFields = nil
  143. }
  144. } else {
  145. sfs := make([]interface{}, 0, len(p.fields))
  146. if conf.IsTesting {
  147. var keys []string
  148. for k, _ := range p.fields {
  149. keys = append(keys, k)
  150. }
  151. sort.Strings(keys)
  152. for _, k := range keys {
  153. sfs = append(sfs, p.fields[k])
  154. }
  155. } else {
  156. for _, v := range p.fields {
  157. sfs = append(sfs, v)
  158. }
  159. }
  160. p.streamFields = sfs
  161. }
  162. p.metaFields = make([]string, 0, len(p.metaMap))
  163. for _, v := range p.metaMap {
  164. p.metaFields = append(p.metaFields, v)
  165. }
  166. // for consistency of results for testing
  167. sort.Strings(p.metaFields)
  168. p.fields = nil
  169. p.metaMap = nil
  170. }
  171. func (p *DataSourcePlan) getProps() error {
  172. if p.iet {
  173. if p.streamStmt.Options.TIMESTAMP != "" {
  174. p.timestampField = p.streamStmt.Options.TIMESTAMP
  175. } else {
  176. return fmt.Errorf("preprocessor is set to be event time but stream option TIMESTAMP not found")
  177. }
  178. if p.streamStmt.Options.TIMESTAMP_FORMAT != "" {
  179. p.timestampFormat = p.streamStmt.Options.TIMESTAMP_FORMAT
  180. }
  181. }
  182. if strings.ToLower(p.streamStmt.Options.FORMAT) == message.FormatBinary {
  183. p.isBinary = true
  184. }
  185. return nil
  186. }