planner.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. package planner
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/common/kv"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream"
  9. "github.com/emqx/kuiper/xstream/api"
  10. "github.com/emqx/kuiper/xstream/nodes"
  11. "github.com/emqx/kuiper/xstream/operators"
  12. "path"
  13. "strings"
  14. )
  15. func Plan(rule *api.Rule, storePath string) (*xstream.TopologyNew, error) {
  16. return PlanWithSourcesAndSinks(rule, storePath, nil, nil)
  17. }
  18. // For test only
  19. func PlanWithSourcesAndSinks(rule *api.Rule, storePath string, sources []*nodes.SourceNode, sinks []*nodes.SinkNode) (*xstream.TopologyNew, error) {
  20. sql := rule.Sql
  21. common.Log.Infof("Init rule with options %+v", rule.Options)
  22. stmt, err := xsql.GetStatementFromSql(sql)
  23. if err != nil {
  24. return nil, err
  25. }
  26. // validation
  27. streamsFromStmt := xsql.GetStreams(stmt)
  28. //if len(sources) > 0 && len(sources) != len(streamsFromStmt) {
  29. // return nil, fmt.Errorf("Invalid parameter sources or streams, the length cannot match the statement, expect %d sources.", len(streamsFromStmt))
  30. //}
  31. if rule.Options.SendMetaToSink && (len(streamsFromStmt) > 1 || stmt.Dimensions != nil) {
  32. return nil, fmt.Errorf("Invalid option sendMetaToSink, it can not be applied to window")
  33. }
  34. store := kv.GetDefaultKVStore(path.Join(storePath, "stream"))
  35. err = store.Open()
  36. if err != nil {
  37. return nil, err
  38. }
  39. defer store.Close()
  40. // Create logical plan and optimize. Logical plans are a linked list
  41. lp, err := createLogicalPlan(stmt, rule.Options, store)
  42. if err != nil {
  43. return nil, err
  44. }
  45. tp, err := createTopo(rule, lp, sources, sinks, streamsFromStmt)
  46. if err != nil {
  47. return nil, err
  48. }
  49. return tp, nil
  50. }
  51. type aliasInfo struct {
  52. alias xsql.Field
  53. refSources []string
  54. isAggregate bool
  55. }
  56. // Analyze the select statement by decorating the info from stream statement.
  57. // Typically, set the correct stream name for fieldRefs
  58. func decorateStmt(s *xsql.SelectStatement, store kv.KeyValue) ([]*xsql.StreamStmt, map[string]*aliasInfo, error) {
  59. streamsFromStmt := xsql.GetStreams(s)
  60. streamStmts := make([]*xsql.StreamStmt, len(streamsFromStmt))
  61. aliasSourceMap := make(map[string]*aliasInfo)
  62. isSchemaless := false
  63. for i, s := range streamsFromStmt {
  64. streamStmt, err := xsql.GetDataSource(store, s)
  65. if err != nil {
  66. return nil, nil, fmt.Errorf("fail to get stream %s, please check if stream is created", s)
  67. }
  68. streamStmts[i] = streamStmt
  69. if streamStmt.StreamFields == nil {
  70. isSchemaless = true
  71. }
  72. }
  73. var walkErr error
  74. for _, f := range s.Fields {
  75. if f.AName != "" {
  76. if _, ok := aliasSourceMap[strings.ToLower(f.AName)]; ok {
  77. return nil, nil, fmt.Errorf("duplicate alias %s", f.AName)
  78. }
  79. refStreams := make(map[string]struct{})
  80. xsql.WalkFunc(f.Expr, func(n xsql.Node) {
  81. switch expr := n.(type) {
  82. case *xsql.FieldRef:
  83. err := updateFieldRefStream(expr, streamStmts, isSchemaless)
  84. if err != nil {
  85. walkErr = err
  86. return
  87. }
  88. if expr.StreamName != "" {
  89. refStreams[string(expr.StreamName)] = struct{}{}
  90. }
  91. }
  92. })
  93. if walkErr != nil {
  94. return nil, nil, walkErr
  95. }
  96. refStreamKeys := make([]string, len(refStreams))
  97. c := 0
  98. for k, _ := range refStreams {
  99. refStreamKeys[c] = k
  100. c++
  101. }
  102. aliasSourceMap[strings.ToLower(f.AName)] = &aliasInfo{
  103. alias: f,
  104. refSources: refStreamKeys,
  105. isAggregate: xsql.HasAggFuncs(f.Expr),
  106. }
  107. }
  108. }
  109. // Select fields are visited firstly to make sure all aliases have streamName set
  110. xsql.WalkFunc(s, func(n xsql.Node) {
  111. //skip alias field
  112. switch f := n.(type) {
  113. case *xsql.Field:
  114. if f.AName != "" {
  115. return
  116. }
  117. case *xsql.FieldRef:
  118. if f.StreamName == xsql.DEFAULT_STREAM {
  119. for aname, ainfo := range aliasSourceMap {
  120. if strings.EqualFold(f.Name, aname) {
  121. switch len(ainfo.refSources) {
  122. case 0: // if no ref source, we can put it to any stream, here just assign it to the first stream
  123. f.StreamName = streamStmts[0].Name
  124. case 1:
  125. f.StreamName = xsql.StreamName(ainfo.refSources[0])
  126. default:
  127. f.StreamName = xsql.MULTI_STREAM
  128. }
  129. return
  130. }
  131. }
  132. }
  133. err := updateFieldRefStream(f, streamStmts, isSchemaless)
  134. if err != nil {
  135. walkErr = err
  136. }
  137. }
  138. })
  139. return streamStmts, aliasSourceMap, walkErr
  140. }
  141. func updateFieldRefStream(f *xsql.FieldRef, streamStmts []*xsql.StreamStmt, isSchemaless bool) (err error) {
  142. count := 0
  143. for _, streamStmt := range streamStmts {
  144. for _, field := range streamStmt.StreamFields {
  145. if strings.EqualFold(f.Name, field.Name) {
  146. if f.StreamName == xsql.DEFAULT_STREAM {
  147. f.StreamName = streamStmt.Name
  148. count++
  149. } else if f.StreamName == streamStmt.Name {
  150. count++
  151. }
  152. break
  153. }
  154. }
  155. }
  156. if count > 1 {
  157. err = fmt.Errorf("ambiguous field %s", f.Name)
  158. } else if count == 0 && f.StreamName == xsql.DEFAULT_STREAM { // alias may refer to non stream field
  159. if !isSchemaless {
  160. err = fmt.Errorf("unknown field %s.%s", f.StreamName, f.Name)
  161. } else if len(streamStmts) == 1 { // If only one schemaless stream, all the fields must be a field of that stream
  162. f.StreamName = streamStmts[0].Name
  163. }
  164. }
  165. return
  166. }
  167. func createTopo(rule *api.Rule, lp LogicalPlan, sources []*nodes.SourceNode, sinks []*nodes.SinkNode, streamsFromStmt []string) (*xstream.TopologyNew, error) {
  168. // Create topology
  169. tp, err := xstream.NewWithNameAndQos(rule.Id, rule.Options.Qos, rule.Options.CheckpointInterval)
  170. if err != nil {
  171. return nil, err
  172. }
  173. input, _, err := buildOps(lp, tp, rule.Options, sources, streamsFromStmt, 0)
  174. if err != nil {
  175. return nil, err
  176. }
  177. inputs := []api.Emitter{input}
  178. // Add actions
  179. if len(sinks) > 0 { // For use of mock sink in testing
  180. for _, sink := range sinks {
  181. tp.AddSink(inputs, sink)
  182. }
  183. } else {
  184. for i, m := range rule.Actions {
  185. for name, action := range m {
  186. props, ok := action.(map[string]interface{})
  187. if !ok {
  188. return nil, fmt.Errorf("expect map[string]interface{} type for the action properties, but found %v", action)
  189. }
  190. tp.AddSink(inputs, nodes.NewSinkNode(fmt.Sprintf("%s_%d", name, i), name, props))
  191. }
  192. }
  193. }
  194. return tp, nil
  195. }
  196. func buildOps(lp LogicalPlan, tp *xstream.TopologyNew, options *api.RuleOption, sources []*nodes.SourceNode, streamsFromStmt []string, index int) (api.Emitter, int, error) {
  197. var inputs []api.Emitter
  198. newIndex := index
  199. for _, c := range lp.Children() {
  200. input, ni, err := buildOps(c, tp, options, sources, streamsFromStmt, newIndex)
  201. if err != nil {
  202. return nil, 0, err
  203. }
  204. newIndex = ni
  205. inputs = append(inputs, input)
  206. }
  207. newIndex++
  208. var (
  209. op nodes.OperatorNode
  210. err error
  211. )
  212. switch t := lp.(type) {
  213. case *DataSourcePlan:
  214. switch t.streamStmt.StreamType {
  215. case xsql.TypeStream:
  216. pp, err := operators.NewPreprocessor(t.streamFields, t.alias, t.allMeta, t.metaFields, t.iet, t.timestampField, t.timestampFormat, t.isBinary)
  217. if err != nil {
  218. return nil, 0, err
  219. }
  220. var srcNode *nodes.SourceNode
  221. if len(sources) == 0 {
  222. node := nodes.NewSourceNode(t.name, t.streamStmt.StreamType, t.streamStmt.Options)
  223. srcNode = node
  224. } else {
  225. srcNode = getMockSource(sources, t.name)
  226. if srcNode == nil {
  227. return nil, 0, fmt.Errorf("can't find predefined source %s", t.name)
  228. }
  229. }
  230. tp.AddSrc(srcNode)
  231. op = Transform(pp, fmt.Sprintf("%d_preprocessor_%s", newIndex, t.name), options)
  232. inputs = []api.Emitter{srcNode}
  233. case xsql.TypeTable:
  234. pp, err := operators.NewTableProcessor(t.name, t.streamFields, t.alias, t.streamStmt.Options)
  235. if err != nil {
  236. return nil, 0, err
  237. }
  238. var srcNode *nodes.SourceNode
  239. if len(sources) > 0 {
  240. srcNode = getMockSource(sources, t.name)
  241. }
  242. if srcNode == nil {
  243. srcNode = nodes.NewSourceNode(t.name, t.streamStmt.StreamType, t.streamStmt.Options)
  244. }
  245. tp.AddSrc(srcNode)
  246. op = Transform(pp, fmt.Sprintf("%d_tableprocessor_%s", newIndex, t.name), options)
  247. inputs = []api.Emitter{srcNode}
  248. }
  249. case *WindowPlan:
  250. if t.condition != nil {
  251. wfilterOp := Transform(&operators.FilterOp{Condition: t.condition}, fmt.Sprintf("%d_windowFilter", newIndex), options)
  252. wfilterOp.SetConcurrency(options.Concurrency)
  253. tp.AddOperator(inputs, wfilterOp)
  254. inputs = []api.Emitter{wfilterOp}
  255. }
  256. op, err = nodes.NewWindowOp(fmt.Sprintf("%d_window", newIndex), nodes.WindowConfig{
  257. Type: t.wtype,
  258. Length: t.length,
  259. Interval: t.interval,
  260. }, streamsFromStmt, options)
  261. if err != nil {
  262. return nil, 0, err
  263. }
  264. case *JoinAlignPlan:
  265. op, err = nodes.NewJoinAlignNode(fmt.Sprintf("%d_join_aligner", newIndex), t.Emitters, options)
  266. case *JoinPlan:
  267. op = Transform(&operators.JoinOp{Joins: t.joins, From: t.from}, fmt.Sprintf("%d_join", newIndex), options)
  268. case *FilterPlan:
  269. op = Transform(&operators.FilterOp{Condition: t.condition}, fmt.Sprintf("%d_filter", newIndex), options)
  270. case *AggregatePlan:
  271. op = Transform(&operators.AggregateOp{Dimensions: t.dimensions, Alias: t.alias}, fmt.Sprintf("%d_aggregate", newIndex), options)
  272. case *HavingPlan:
  273. op = Transform(&operators.HavingOp{Condition: t.condition}, fmt.Sprintf("%d_having", newIndex), options)
  274. case *OrderPlan:
  275. op = Transform(&operators.OrderOp{SortFields: t.SortFields}, fmt.Sprintf("%d_order", newIndex), options)
  276. case *ProjectPlan:
  277. op = Transform(&operators.ProjectOp{Fields: t.fields, IsAggregate: t.isAggregate, SendMeta: t.sendMeta}, fmt.Sprintf("%d_project", newIndex), options)
  278. default:
  279. return nil, 0, fmt.Errorf("unknown logical plan %v", t)
  280. }
  281. if uop, ok := op.(*nodes.UnaryOperator); ok {
  282. uop.SetConcurrency(options.Concurrency)
  283. }
  284. tp.AddOperator(inputs, op)
  285. return op, newIndex, nil
  286. }
  287. func getMockSource(sources []*nodes.SourceNode, name string) *nodes.SourceNode {
  288. for _, source := range sources {
  289. if name == source.GetName() {
  290. return source
  291. }
  292. }
  293. return nil
  294. }
  295. func createLogicalPlan(stmt *xsql.SelectStatement, opt *api.RuleOption, store kv.KeyValue) (LogicalPlan, error) {
  296. dimensions := stmt.Dimensions
  297. var (
  298. p LogicalPlan
  299. children []LogicalPlan
  300. // If there are tables, the plan graph will be different for join/window
  301. tableChildren []LogicalPlan
  302. tableEmitters []string
  303. w *xsql.Window
  304. ds xsql.Dimensions
  305. )
  306. streamStmts, aliasMap, err := decorateStmt(stmt, store)
  307. if err != nil {
  308. return nil, err
  309. }
  310. for i, streamStmt := range streamStmts {
  311. p = DataSourcePlan{
  312. name: string(streamStmt.Name),
  313. streamStmt: streamStmt,
  314. iet: opt.IsEventTime,
  315. alias: aliasFieldsForSource(aliasMap, streamStmt.Name, i == 0),
  316. allMeta: opt.SendMetaToSink,
  317. }.Init()
  318. if streamStmt.StreamType == xsql.TypeStream {
  319. children = append(children, p)
  320. } else {
  321. tableChildren = append(tableChildren, p)
  322. tableEmitters = append(tableEmitters, string(streamStmt.Name))
  323. }
  324. }
  325. aggregateAlias, _ := complexAlias(aliasMap)
  326. if dimensions != nil {
  327. w = dimensions.GetWindow()
  328. if w != nil {
  329. if len(children) == 0 {
  330. return nil, errors.New("cannot run window for TABLE sources")
  331. }
  332. wp := WindowPlan{
  333. wtype: w.WindowType,
  334. length: w.Length.Val,
  335. isEventTime: opt.IsEventTime,
  336. }.Init()
  337. if w.Interval != nil {
  338. wp.interval = w.Interval.Val
  339. } else if w.WindowType == xsql.COUNT_WINDOW {
  340. //if no interval value is set and it's count window, then set interval to length value.
  341. wp.interval = w.Length.Val
  342. }
  343. if w.Filter != nil {
  344. wp.condition = w.Filter
  345. }
  346. // TODO calculate limit
  347. // TODO incremental aggregate
  348. wp.SetChildren(children)
  349. children = []LogicalPlan{wp}
  350. p = wp
  351. }
  352. }
  353. if stmt.Joins != nil {
  354. if len(tableChildren) > 0 {
  355. p = JoinAlignPlan{
  356. Emitters: tableEmitters,
  357. }.Init()
  358. p.SetChildren(append(children, tableChildren...))
  359. children = []LogicalPlan{p}
  360. } else if w == nil {
  361. return nil, errors.New("need to run stream join in windows")
  362. }
  363. // TODO extract on filter
  364. p = JoinPlan{
  365. from: stmt.Sources[0].(*xsql.Table),
  366. joins: stmt.Joins,
  367. }.Init()
  368. p.SetChildren(children)
  369. children = []LogicalPlan{p}
  370. }
  371. if stmt.Condition != nil {
  372. p = FilterPlan{
  373. condition: stmt.Condition,
  374. }.Init()
  375. p.SetChildren(children)
  376. children = []LogicalPlan{p}
  377. }
  378. // TODO handle aggregateAlias in optimization as it does not only happen in select fields
  379. if dimensions != nil || len(aggregateAlias) > 0 {
  380. ds = dimensions.GetGroups()
  381. if (ds != nil && len(ds) > 0) || len(aggregateAlias) > 0 {
  382. p = AggregatePlan{
  383. dimensions: ds,
  384. alias: aggregateAlias,
  385. }.Init()
  386. p.SetChildren(children)
  387. children = []LogicalPlan{p}
  388. }
  389. }
  390. if stmt.Having != nil {
  391. p = HavingPlan{
  392. condition: stmt.Having,
  393. }.Init()
  394. p.SetChildren(children)
  395. children = []LogicalPlan{p}
  396. }
  397. if stmt.SortFields != nil {
  398. p = OrderPlan{
  399. SortFields: stmt.SortFields,
  400. }.Init()
  401. p.SetChildren(children)
  402. children = []LogicalPlan{p}
  403. }
  404. if stmt.Fields != nil {
  405. p = ProjectPlan{
  406. fields: stmt.Fields,
  407. isAggregate: xsql.IsAggStatement(stmt),
  408. sendMeta: opt.SendMetaToSink,
  409. }.Init()
  410. p.SetChildren(children)
  411. }
  412. return optimize(p)
  413. }
  414. func aliasFieldsForSource(aliasMap map[string]*aliasInfo, name xsql.StreamName, isFirst bool) (result xsql.Fields) {
  415. for _, ainfo := range aliasMap {
  416. if ainfo.isAggregate {
  417. continue
  418. }
  419. switch len(ainfo.refSources) {
  420. case 0:
  421. if isFirst {
  422. result = append(result, ainfo.alias)
  423. }
  424. case 1:
  425. if strings.EqualFold(ainfo.refSources[0], string(name)) {
  426. result = append(result, ainfo.alias)
  427. }
  428. }
  429. }
  430. return
  431. }
  432. func complexAlias(aliasMap map[string]*aliasInfo) (aggregateAlias xsql.Fields, joinAlias xsql.Fields) {
  433. for _, ainfo := range aliasMap {
  434. if ainfo.isAggregate {
  435. aggregateAlias = append(aggregateAlias, ainfo.alias)
  436. continue
  437. }
  438. if len(ainfo.refSources) > 1 {
  439. joinAlias = append(joinAlias, ainfo.alias)
  440. }
  441. }
  442. return
  443. }
  444. func Transform(op nodes.UnOperation, name string, options *api.RuleOption) *nodes.UnaryOperator {
  445. operator := nodes.New(name, xsql.FuncRegisters, options)
  446. operator.SetOperation(op)
  447. return operator
  448. }