planner.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. package planner
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/emqx/kuiper/xstream/nodes"
  9. "github.com/emqx/kuiper/xstream/operators"
  10. "path"
  11. "strings"
  12. )
  13. func Plan(rule *api.Rule, storePath string) (*xstream.TopologyNew, error) {
  14. return PlanWithSourcesAndSinks(rule, storePath, nil, nil)
  15. }
  16. // For test only
  17. func PlanWithSourcesAndSinks(rule *api.Rule, storePath string, sources []*nodes.SourceNode, sinks []*nodes.SinkNode) (*xstream.TopologyNew, error) {
  18. sql := rule.Sql
  19. common.Log.Infof("Init rule with options %+v", rule.Options)
  20. stmt, err := xsql.GetStatementFromSql(sql)
  21. if err != nil {
  22. return nil, err
  23. }
  24. // validation
  25. streamsFromStmt := xsql.GetStreams(stmt)
  26. if len(sources) > 0 && len(sources) != len(streamsFromStmt) {
  27. return nil, fmt.Errorf("Invalid parameter sources or streams, the length cannot match the statement, expect %d sources.", len(streamsFromStmt))
  28. }
  29. if rule.Options.SendMetaToSink && (len(streamsFromStmt) > 1 || stmt.Dimensions != nil) {
  30. return nil, fmt.Errorf("Invalid option sendMetaToSink, it can not be applied to window")
  31. }
  32. store := common.GetSqliteKVStore(path.Join(storePath, "stream"))
  33. err = store.Open()
  34. if err != nil {
  35. return nil, err
  36. }
  37. defer store.Close()
  38. // Create logical plan and optimize. Logical plans are a linked list
  39. lp, err := createLogicalPlan(stmt, rule.Options)
  40. if err != nil {
  41. return nil, err
  42. }
  43. tp, err := createTopo(rule, lp, sources, sinks, store, streamsFromStmt)
  44. if err != nil {
  45. return nil, err
  46. }
  47. return tp, nil
  48. }
  49. func createTopo(rule *api.Rule, lp LogicalPlan, sources []*nodes.SourceNode, sinks []*nodes.SinkNode, store common.KeyValue, streamsFromStmt []string) (*xstream.TopologyNew, error) {
  50. // Create topology
  51. tp, err := xstream.NewWithNameAndQos(rule.Id, rule.Options.Qos, rule.Options.CheckpointInterval)
  52. if err != nil {
  53. return nil, err
  54. }
  55. input, _, err := buildOps(lp, tp, rule.Options, sources, store, streamsFromStmt, 0)
  56. if err != nil {
  57. return nil, err
  58. }
  59. inputs := []api.Emitter{input}
  60. // Add actions
  61. if len(sinks) > 0 { // For use of mock sink in testing
  62. for _, sink := range sinks {
  63. tp.AddSink(inputs, sink)
  64. }
  65. } else {
  66. for i, m := range rule.Actions {
  67. for name, action := range m {
  68. props, ok := action.(map[string]interface{})
  69. if !ok {
  70. return nil, fmt.Errorf("expect map[string]interface{} type for the action properties, but found %v", action)
  71. }
  72. tp.AddSink(inputs, nodes.NewSinkNode(fmt.Sprintf("%s_%d", name, i), name, props))
  73. }
  74. }
  75. }
  76. return tp, nil
  77. }
  78. func buildOps(lp LogicalPlan, tp *xstream.TopologyNew, options *api.RuleOption, sources []*nodes.SourceNode, store common.KeyValue, streamsFromStmt []string, index int) (api.Emitter, int, error) {
  79. var inputs []api.Emitter
  80. newIndex := index
  81. for _, c := range lp.Children() {
  82. input, ni, err := buildOps(c, tp, options, sources, store, streamsFromStmt, newIndex)
  83. if err != nil {
  84. return nil, 0, err
  85. }
  86. newIndex = ni
  87. inputs = append(inputs, input)
  88. }
  89. newIndex++
  90. var (
  91. op nodes.OperatorNode
  92. err error
  93. )
  94. switch t := lp.(type) {
  95. case *DataSourcePlan:
  96. streamStmt, err := getStream(store, t.name)
  97. if err != nil {
  98. return nil, 0, fmt.Errorf("fail to get stream %s, please check if stream is created", t.name)
  99. }
  100. isBinary := false
  101. if f, ok := streamStmt.Options["FORMAT"]; ok {
  102. if strings.ToLower(f) == common.FORMAT_BINARY {
  103. isBinary = true
  104. }
  105. }
  106. pp, err := operators.NewPreprocessor(streamStmt, t.alias, options.IsEventTime, isBinary)
  107. if err != nil {
  108. return nil, 0, err
  109. }
  110. var srcNode *nodes.SourceNode
  111. if len(sources) == 0 {
  112. node := nodes.NewSourceNode(t.name, streamStmt.Options)
  113. srcNode = node
  114. } else {
  115. found := false
  116. for _, source := range sources {
  117. if t.name == source.GetName() {
  118. srcNode = source
  119. found = true
  120. }
  121. }
  122. if !found {
  123. return nil, 0, fmt.Errorf("can't find predefined source %s", t.name)
  124. }
  125. }
  126. tp.AddSrc(srcNode)
  127. op = xstream.Transform(pp, fmt.Sprintf("%d_preprocessor_%s", newIndex, t.name), options.BufferLength)
  128. inputs = []api.Emitter{srcNode}
  129. case *WindowPlan:
  130. if t.condition != nil {
  131. wfilterOp := xstream.Transform(&operators.FilterOp{Condition: t.condition}, fmt.Sprintf("%d_windowFilter", newIndex), options.BufferLength)
  132. wfilterOp.SetConcurrency(options.Concurrency)
  133. tp.AddOperator(inputs, wfilterOp)
  134. inputs = []api.Emitter{wfilterOp}
  135. }
  136. op, err = nodes.NewWindowOp(fmt.Sprintf("%d_window", newIndex), nodes.WindowConfig{
  137. Type: t.wtype,
  138. Length: t.length,
  139. Interval: t.interval,
  140. }, t.isEventTime, options.LateTol, streamsFromStmt, options.BufferLength)
  141. if err != nil {
  142. return nil, 0, err
  143. }
  144. case *JoinPlan:
  145. op = xstream.Transform(&operators.JoinOp{Joins: t.joins, From: t.from}, fmt.Sprintf("%d_join", newIndex), options.BufferLength)
  146. case *FilterPlan:
  147. op = xstream.Transform(&operators.FilterOp{Condition: t.condition}, fmt.Sprintf("%d_filter", newIndex), options.BufferLength)
  148. case *AggregatePlan:
  149. op = xstream.Transform(&operators.AggregateOp{Dimensions: t.dimensions, Alias: t.alias}, fmt.Sprintf("%d_aggregate", newIndex), options.BufferLength)
  150. case *HavingPlan:
  151. op = xstream.Transform(&operators.HavingOp{Condition: t.condition}, fmt.Sprintf("%d_having", newIndex), options.BufferLength)
  152. case *OrderPlan:
  153. op = xstream.Transform(&operators.OrderOp{SortFields: t.SortFields}, fmt.Sprintf("%d_order", newIndex), options.BufferLength)
  154. case *ProjectPlan:
  155. op = xstream.Transform(&operators.ProjectOp{Fields: t.fields, IsAggregate: t.isAggregate, SendMeta: t.sendMeta}, fmt.Sprintf("%d_project", newIndex), options.BufferLength)
  156. default:
  157. return nil, 0, fmt.Errorf("unknown logical plan %v", t)
  158. }
  159. if uop, ok := op.(*nodes.UnaryOperator); ok {
  160. uop.SetConcurrency(options.Concurrency)
  161. }
  162. tp.AddOperator(inputs, op)
  163. return op, newIndex, nil
  164. }
  165. func getStream(m common.KeyValue, name string) (stmt *xsql.StreamStmt, err error) {
  166. var s string
  167. f, err := m.Get(name, &s)
  168. if !f || err != nil {
  169. return nil, fmt.Errorf("Cannot find key %s. ", name)
  170. }
  171. parser := xsql.NewParser(strings.NewReader(s))
  172. stream, err := xsql.Language.Parse(parser)
  173. stmt, ok := stream.(*xsql.StreamStmt)
  174. if !ok {
  175. err = fmt.Errorf("Error resolving the stream %s, the data in db may be corrupted.", name)
  176. }
  177. return
  178. }
  179. func createLogicalPlan(stmt *xsql.SelectStatement, opt *api.RuleOption) (LogicalPlan, error) {
  180. streamsFromStmt := xsql.GetStreams(stmt)
  181. dimensions := stmt.Dimensions
  182. var (
  183. p LogicalPlan
  184. children []LogicalPlan
  185. w *xsql.Window
  186. ds xsql.Dimensions
  187. alias, aggregateAlias xsql.Fields
  188. )
  189. for _, f := range stmt.Fields {
  190. if f.AName != "" {
  191. if !xsql.HasAggFuncs(f.Expr) {
  192. alias = append(alias, f)
  193. } else {
  194. aggregateAlias = append(aggregateAlias, f)
  195. }
  196. }
  197. }
  198. for _, s := range streamsFromStmt {
  199. p = DataSourcePlan{
  200. name: s,
  201. isWildCard: true,
  202. needMeta: opt.SendMetaToSink,
  203. fields: nil,
  204. metaFields: nil,
  205. alias: alias,
  206. }.Init()
  207. children = append(children, p)
  208. }
  209. if dimensions != nil {
  210. w = dimensions.GetWindow()
  211. if w != nil {
  212. wp := WindowPlan{
  213. wtype: w.WindowType,
  214. length: w.Length.Val,
  215. isEventTime: opt.IsEventTime,
  216. }.Init()
  217. if w.Interval != nil {
  218. wp.interval = w.Interval.Val
  219. } else if w.WindowType == xsql.COUNT_WINDOW {
  220. //if no interval value is set and it's count window, then set interval to length value.
  221. wp.interval = w.Length.Val
  222. }
  223. if w.Filter != nil {
  224. wp.condition = w.Filter
  225. }
  226. // TODO calculate limit
  227. // TODO incremental aggregate
  228. wp.SetChildren(children)
  229. children = []LogicalPlan{wp}
  230. p = wp
  231. }
  232. }
  233. if w != nil && stmt.Joins != nil {
  234. // TODO extract on filter
  235. p = JoinPlan{
  236. from: stmt.Sources[0].(*xsql.Table),
  237. joins: stmt.Joins,
  238. }.Init()
  239. p.SetChildren(children)
  240. children = []LogicalPlan{p}
  241. }
  242. if stmt.Condition != nil {
  243. p = FilterPlan{
  244. condition: stmt.Condition,
  245. }.Init()
  246. p.SetChildren(children)
  247. children = []LogicalPlan{p}
  248. }
  249. // TODO handle aggregateAlias in optimization as it does not only happen in select fields
  250. if dimensions != nil || len(aggregateAlias) > 0 {
  251. ds = dimensions.GetGroups()
  252. if (ds != nil && len(ds) > 0) || len(aggregateAlias) > 0 {
  253. p = AggregatePlan{
  254. dimensions: ds,
  255. alias: aggregateAlias,
  256. }.Init()
  257. p.SetChildren(children)
  258. children = []LogicalPlan{p}
  259. }
  260. }
  261. if stmt.Having != nil {
  262. p = HavingPlan{
  263. condition: stmt.Having,
  264. }.Init()
  265. p.SetChildren(children)
  266. children = []LogicalPlan{p}
  267. }
  268. if stmt.SortFields != nil {
  269. p = OrderPlan{
  270. SortFields: stmt.SortFields,
  271. }.Init()
  272. p.SetChildren(children)
  273. children = []LogicalPlan{p}
  274. }
  275. if stmt.Fields != nil {
  276. p = ProjectPlan{
  277. fields: stmt.Fields,
  278. isAggregate: xsql.IsAggStatement(stmt),
  279. sendMeta: opt.SendMetaToSink,
  280. }.Init()
  281. p.SetChildren(children)
  282. children = []LogicalPlan{p}
  283. }
  284. return optimize(p)
  285. }