planner_graph.go 17 KB


  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "errors"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/binder/function"
  19. "github.com/lf-edge/ekuiper/internal/topo"
  20. "github.com/lf-edge/ekuiper/internal/topo/graph"
  21. "github.com/lf-edge/ekuiper/internal/topo/node"
  22. "github.com/lf-edge/ekuiper/internal/topo/operator"
  23. "github.com/lf-edge/ekuiper/internal/xsql"
  24. "github.com/lf-edge/ekuiper/pkg/api"
  25. "github.com/lf-edge/ekuiper/pkg/ast"
  26. "github.com/lf-edge/ekuiper/pkg/cast"
  27. "github.com/lf-edge/ekuiper/pkg/message"
  28. "strings"
  29. )
  30. // PlanByGraph returns a topo.Topo object by a graph
  31. func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
  32. ruleGraph := rule.Graph
  33. if ruleGraph == nil {
  34. return nil, errors.New("no graph")
  35. }
  36. tp, err := topo.NewWithNameAndQos(rule.Id, rule.Options.Qos, rule.Options.CheckpointInterval)
  37. if err != nil {
  38. return nil, err
  39. }
  40. var (
  41. nodeMap = make(map[string]api.TopNode)
  42. sinks = make(map[string]bool)
  43. sources = make(map[string]bool)
  44. )
  45. for nodeName, gn := range ruleGraph.Nodes {
  46. switch gn.Type {
  47. case "source":
  48. if _, ok := ruleGraph.Topo.Edges[nodeName]; !ok {
  49. return nil, fmt.Errorf("no edge defined for source node %s", nodeName)
  50. }
  51. sourceType, ok := gn.Props["source_type"]
  52. if !ok {
  53. sourceType = "stream"
  54. }
  55. st, ok := sourceType.(string)
  56. if !ok {
  57. return nil, fmt.Errorf("source_type %v is not string", sourceType)
  58. }
  59. st = strings.ToLower(st)
  60. sourceOption := &ast.Options{}
  61. err := cast.MapToStruct(gn.Props, sourceOption)
  62. if err != nil {
  63. return nil, err
  64. }
  65. sourceOption.TYPE = gn.NodeType
  66. switch st {
  67. case "stream":
  68. // TODO deal with conf key
  69. pp, err := operator.NewPreprocessor(true, nil, true, nil, rule.Options.IsEventTime, sourceOption.TIMESTAMP, sourceOption.TIMESTAMP_FORMAT, strings.EqualFold(sourceOption.FORMAT, message.FormatBinary), sourceOption.STRICT_VALIDATION)
  70. if err != nil {
  71. return nil, err
  72. }
  73. srcNode := node.NewSourceNode(nodeName, ast.TypeStream, pp, sourceOption, rule.Options.SendError)
  74. nodeMap[nodeName] = srcNode
  75. tp.AddSrc(srcNode)
  76. case "table":
  77. // TODO add table
  78. return nil, fmt.Errorf("table source is not supported yet")
  79. default:
  80. return nil, fmt.Errorf("unknown source type %s", st)
  81. }
  82. sources[nodeName] = true
  83. case "sink":
  84. if _, ok := ruleGraph.Topo.Edges[nodeName]; ok {
  85. return nil, fmt.Errorf("sink %s has edge", nodeName)
  86. }
  87. nodeMap[nodeName] = node.NewSinkNode(nodeName, gn.NodeType, gn.Props)
  88. sinks[nodeName] = true
  89. case "operator":
  90. if _, ok := ruleGraph.Topo.Edges[nodeName]; !ok {
  91. return nil, fmt.Errorf("no edge defined for operator node %s", nodeName)
  92. }
  93. switch strings.ToLower(gn.NodeType) {
  94. case "function":
  95. fop, err := parseFunc(gn.Props)
  96. if err != nil {
  97. return nil, err
  98. }
  99. op := Transform(fop, nodeName, rule.Options)
  100. nodeMap[nodeName] = op
  101. case "aggfunc":
  102. fop, err := parseFunc(gn.Props)
  103. if err != nil {
  104. return nil, err
  105. }
  106. fop.IsAgg = true
  107. op := Transform(fop, nodeName, rule.Options)
  108. nodeMap[nodeName] = op
  109. case "filter":
  110. fop, err := parseFilter(gn.Props)
  111. if err != nil {
  112. return nil, err
  113. }
  114. op := Transform(fop, nodeName, rule.Options)
  115. nodeMap[nodeName] = op
  116. case "pick":
  117. pop, err := parsePick(gn.Props)
  118. if err != nil {
  119. return nil, err
  120. }
  121. op := Transform(pop, nodeName, rule.Options)
  122. nodeMap[nodeName] = op
  123. case "window":
  124. wconf, err := parseWindow(gn.Props)
  125. if err != nil {
  126. return nil, err
  127. }
  128. op, err := node.NewWindowOp(nodeName, *wconf, ruleGraph.Topo.Sources, rule.Options)
  129. if err != nil {
  130. return nil, err
  131. }
  132. nodeMap[nodeName] = op
  133. case "join":
  134. jop, err := parseJoin(gn.Props)
  135. if err != nil {
  136. return nil, err
  137. }
  138. op := Transform(jop, nodeName, rule.Options)
  139. nodeMap[nodeName] = op
  140. case "groupby":
  141. gop, err := parseGroupBy(gn.Props)
  142. if err != nil {
  143. return nil, err
  144. }
  145. op := Transform(gop, nodeName, rule.Options)
  146. nodeMap[nodeName] = op
  147. case "orderby":
  148. oop, err := parseOrderBy(gn.Props)
  149. if err != nil {
  150. return nil, err
  151. }
  152. op := Transform(oop, nodeName, rule.Options)
  153. nodeMap[nodeName] = op
  154. case "switch":
  155. sconf, err := parseSwitch(gn.Props)
  156. if err != nil {
  157. return nil, fmt.Errorf("parse switch %s error: %v", nodeName, err)
  158. }
  159. op, err := node.NewSwitchNode(nodeName, sconf, rule.Options)
  160. if err != nil {
  161. return nil, fmt.Errorf("create switch %s error: %v", nodeName, err)
  162. }
  163. nodeMap[nodeName] = op
  164. default: // TODO other node type
  165. return nil, fmt.Errorf("unknown operator type %s", gn.NodeType)
  166. }
  167. default:
  168. return nil, fmt.Errorf("unknown node type %s", gn.Type)
  169. }
  170. }
  171. // validate source node
  172. for _, nodeName := range ruleGraph.Topo.Sources {
  173. if _, ok := sources[nodeName]; !ok {
  174. return nil, fmt.Errorf("source %s is not a source type node", nodeName)
  175. }
  176. }
  177. // reverse edges, value is a 2-dim array. Only switch node will have the second dim
  178. reversedEdges := make(map[string][][]string)
  179. rclone := make(map[string][]string)
  180. for fromNode, toNodes := range ruleGraph.Topo.Edges {
  181. if _, ok := ruleGraph.Nodes[fromNode]; !ok {
  182. return nil, fmt.Errorf("node %s is not defined", fromNode)
  183. }
  184. for i, toNode := range toNodes {
  185. switch tn := toNode.(type) {
  186. case string:
  187. if _, ok := ruleGraph.Nodes[tn]; !ok {
  188. return nil, fmt.Errorf("node %s is not defined", tn)
  189. }
  190. if _, ok := reversedEdges[tn]; !ok {
  191. reversedEdges[tn] = make([][]string, 1)
  192. }
  193. reversedEdges[tn][0] = append(reversedEdges[tn][0], fromNode)
  194. rclone[tn] = append(rclone[tn], fromNode)
  195. case []interface{}:
  196. for _, tni := range tn {
  197. tnn, ok := tni.(string)
  198. if !ok { // never happen
  199. return nil, fmt.Errorf("invalid edge toNode %v", toNode)
  200. }
  201. if _, ok := ruleGraph.Nodes[tnn]; !ok {
  202. return nil, fmt.Errorf("node %s is not defined", tnn)
  203. }
  204. for len(reversedEdges[tnn]) <= i {
  205. reversedEdges[tnn] = append(reversedEdges[tnn], []string{})
  206. }
  207. reversedEdges[tnn][i] = append(reversedEdges[tnn][i], fromNode)
  208. rclone[tnn] = append(rclone[tnn], fromNode)
  209. }
  210. }
  211. }
  212. }
  213. // sort the nodes by topological order
  214. nodesInOrder := make([]string, len(ruleGraph.Nodes))
  215. i := 0
  216. genNodesInOrder(ruleGraph.Topo.Sources, ruleGraph.Topo.Edges, rclone, nodesInOrder, i)
  217. // validate the typo
  218. // the map is to record the output for each node
  219. dataFlow := make(map[string]*graph.IOType)
  220. for _, n := range nodesInOrder {
  221. gn := ruleGraph.Nodes[n]
  222. if gn == nil {
  223. return nil, fmt.Errorf("can't find node %s", n)
  224. }
  225. if gn.Type == "source" {
  226. dataFlow[n] = &graph.IOType{
  227. Type: graph.IOINPUT_TYPE_ROW,
  228. RowType: graph.IOROW_TYPE_SINGLE,
  229. CollectionType: graph.IOCOLLECTION_TYPE_ANY,
  230. AllowMulti: false,
  231. }
  232. } else if gn.Type == "sink" {
  233. continue
  234. } else {
  235. nodeIO, ok := graph.OpIO[strings.ToLower(gn.NodeType)]
  236. if !ok {
  237. return nil, fmt.Errorf("can't find the io definiton for node type %s", gn.NodeType)
  238. }
  239. dataInCondition := nodeIO[0]
  240. indim := reversedEdges[n]
  241. var innodes []string
  242. for _, in := range indim {
  243. innodes = append(innodes, in...)
  244. }
  245. if len(innodes) > 1 {
  246. if dataInCondition.AllowMulti {
  247. for _, innode := range innodes {
  248. _, err = graph.Fit(dataFlow[innode], dataInCondition)
  249. if err != nil {
  250. return nil, fmt.Errorf("node %s output does not match node %s input: %v", innode, n, err)
  251. }
  252. }
  253. } else {
  254. return nil, fmt.Errorf("operator %s of type %s does not allow multiple inputs", n, gn.NodeType)
  255. }
  256. } else if len(innodes) == 1 {
  257. _, err := graph.Fit(dataFlow[innodes[0]], dataInCondition)
  258. if err != nil {
  259. return nil, fmt.Errorf("node %s output does not match node %s input: %v", innodes[0], n, err)
  260. }
  261. } else {
  262. return nil, fmt.Errorf("operator %s of type %s has no input", n, gn.NodeType)
  263. }
  264. out := nodeIO[1]
  265. in := dataFlow[innodes[0]]
  266. dataFlow[n] = graph.MapOut(in, out)
  267. // convert filter to having if the input is aggregated
  268. if gn.NodeType == "filter" && in.Type == graph.IOINPUT_TYPE_COLLECTION && in.CollectionType == graph.IOCOLLECTION_TYPE_GROUPED {
  269. fop, err := parseHaving(gn.Props)
  270. if err != nil {
  271. return nil, err
  272. }
  273. op := Transform(fop, n, rule.Options)
  274. nodeMap[n] = op
  275. }
  276. }
  277. }
  278. // add the linkages
  279. for nodeName, fromNodes := range reversedEdges {
  280. totalLen := 0
  281. for _, fromNode := range fromNodes {
  282. totalLen += len(fromNode)
  283. }
  284. inputs := make([]api.Emitter, 0, totalLen)
  285. for i, fromNode := range fromNodes {
  286. for _, from := range fromNode {
  287. if i == 0 {
  288. inputs = append(inputs, nodeMap[from].(api.Emitter))
  289. } else {
  290. switch sn := nodeMap[from].(type) {
  291. case *node.SwitchNode:
  292. inputs = append(inputs, sn.GetEmitter(i))
  293. default:
  294. return nil, fmt.Errorf("node %s is not a switch node but have multiple output", from)
  295. }
  296. }
  297. }
  298. }
  299. n := nodeMap[nodeName]
  300. if n == nil {
  301. return nil, fmt.Errorf("node %s is not defined", nodeName)
  302. }
  303. if _, ok := sinks[nodeName]; ok {
  304. tp.AddSink(inputs, n.(*node.SinkNode))
  305. } else {
  306. tp.AddOperator(inputs, n.(node.OperatorNode))
  307. }
  308. }
  309. return tp, nil
  310. }
  311. func genNodesInOrder(toNodes []string, edges map[string][]interface{}, flatReversedEdges map[string][]string, nodesInOrder []string, i int) int {
  312. for _, src := range toNodes {
  313. if len(flatReversedEdges[src]) > 1 {
  314. flatReversedEdges[src] = flatReversedEdges[src][1:]
  315. continue
  316. }
  317. nodesInOrder[i] = src
  318. i++
  319. tns := make([]string, 0, len(edges[src]))
  320. for _, toNode := range edges[src] {
  321. switch toNode.(type) {
  322. case string:
  323. tns = append(tns, toNode.(string))
  324. case []interface{}:
  325. for _, tni := range toNode.([]interface{}) {
  326. tns = append(tns, tni.(string))
  327. }
  328. }
  329. }
  330. i = genNodesInOrder(tns, edges, flatReversedEdges, nodesInOrder, i)
  331. }
  332. return i
  333. }
  334. func parseOrderBy(props map[string]interface{}) (*operator.OrderOp, error) {
  335. n := &graph.Orderby{}
  336. err := cast.MapToStruct(props, n)
  337. if err != nil {
  338. return nil, err
  339. }
  340. stmt := "SELECT * FROM unknown ORDER BY"
  341. for _, s := range n.Sorts {
  342. stmt += " " + s.Field + " "
  343. if s.Desc {
  344. stmt += "DESC"
  345. }
  346. }
  347. p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
  348. if err != nil {
  349. return nil, fmt.Errorf("invalid order by statement error: %v", err)
  350. }
  351. if len(p.SortFields) == 0 {
  352. return nil, fmt.Errorf("order by statement is empty")
  353. }
  354. return &operator.OrderOp{
  355. SortFields: p.SortFields,
  356. }, nil
  357. }
  358. func parseGroupBy(props map[string]interface{}) (*operator.AggregateOp, error) {
  359. n := &graph.Groupby{}
  360. err := cast.MapToStruct(props, n)
  361. if err != nil {
  362. return nil, err
  363. }
  364. if len(n.Dimensions) == 0 {
  365. return nil, fmt.Errorf("groupby must have at least one dimension")
  366. }
  367. stmt := "SELECT * FROM unknown Group By " + strings.Join(n.Dimensions, ",")
  368. p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
  369. if err != nil {
  370. return nil, fmt.Errorf("invalid join statement error: %v", err)
  371. }
  372. return &operator.AggregateOp{Dimensions: p.Dimensions}, nil
  373. }
  374. func parseJoin(props map[string]interface{}) (*operator.JoinOp, error) {
  375. n := &graph.Join{}
  376. err := cast.MapToStruct(props, n)
  377. if err != nil {
  378. return nil, err
  379. }
  380. stmt := "SELECT * FROM " + n.From
  381. for _, join := range n.Joins {
  382. stmt += " " + join.Type + " JOIN ON " + join.On
  383. }
  384. p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
  385. if err != nil {
  386. return nil, fmt.Errorf("invalid join statement error: %v", err)
  387. }
  388. return &operator.JoinOp{Joins: p.Joins, From: p.Sources[0].(*ast.Table)}, nil
  389. }
  390. func parseWindow(props map[string]interface{}) (*node.WindowConfig, error) {
  391. n := &graph.Window{}
  392. err := cast.MapToStruct(props, n)
  393. if err != nil {
  394. return nil, err
  395. }
  396. if n.Size <= 0 {
  397. return nil, fmt.Errorf("window size %d is invalid", n.Size)
  398. }
  399. var (
  400. wt ast.WindowType
  401. length int
  402. interval int
  403. )
  404. switch strings.ToLower(n.Type) {
  405. case "tumblingwindow":
  406. wt = ast.TUMBLING_WINDOW
  407. if n.Interval != 0 && n.Interval != n.Size {
  408. return nil, fmt.Errorf("tumbling window interval must equal to size")
  409. }
  410. case "hoppingwindow":
  411. wt = ast.HOPPING_WINDOW
  412. if n.Interval <= 0 {
  413. return nil, fmt.Errorf("hopping window interval must be greater than 0")
  414. }
  415. if n.Interval > n.Size {
  416. return nil, fmt.Errorf("hopping window interval must be less than size")
  417. }
  418. case "sessionwindow":
  419. wt = ast.SESSION_WINDOW
  420. if n.Interval <= 0 {
  421. return nil, fmt.Errorf("hopping window interval must be greater than 0")
  422. }
  423. case "slidingwindow":
  424. wt = ast.SLIDING_WINDOW
  425. if n.Interval != 0 && n.Interval != n.Size {
  426. return nil, fmt.Errorf("tumbling window interval must equal to size")
  427. }
  428. case "countwindow":
  429. wt = ast.COUNT_WINDOW
  430. if n.Interval < 0 {
  431. return nil, fmt.Errorf("count window interval must be greater or equal to 0")
  432. }
  433. if n.Interval > n.Size {
  434. return nil, fmt.Errorf("count window interval must be less than size")
  435. }
  436. if n.Interval == 0 {
  437. n.Interval = n.Size
  438. }
  439. default:
  440. return nil, fmt.Errorf("unknown window type %s", n.Type)
  441. }
  442. if wt == ast.COUNT_WINDOW {
  443. length = n.Size
  444. interval = n.Interval
  445. } else {
  446. var unit = 1
  447. switch strings.ToLower(n.Unit) {
  448. case "dd":
  449. unit = 24 * 3600 * 1000
  450. case "hh":
  451. unit = 3600 * 1000
  452. case "mi":
  453. unit = 60 * 1000
  454. case "ss":
  455. unit = 1000
  456. case "ms":
  457. unit = 1
  458. default:
  459. return nil, fmt.Errorf("Invalid unit %s", n.Unit)
  460. }
  461. length = n.Size * unit
  462. interval = n.Interval * unit
  463. }
  464. return &node.WindowConfig{
  465. Type: wt,
  466. Length: length,
  467. Interval: interval,
  468. }, nil
  469. }
  470. func parsePick(props map[string]interface{}) (*operator.ProjectOp, error) {
  471. n := &graph.Select{}
  472. err := cast.MapToStruct(props, n)
  473. if err != nil {
  474. return nil, err
  475. }
  476. stmt, err := xsql.NewParser(strings.NewReader("select " + strings.Join(n.Fields, ",") + " from nonexist")).Parse()
  477. if err != nil {
  478. return nil, err
  479. }
  480. t := ProjectPlan{
  481. fields: stmt.Fields,
  482. isAggregate: xsql.IsAggStatement(stmt),
  483. }.Init()
  484. return &operator.ProjectOp{ColNames: t.colNames, AliasNames: t.aliasNames, AliasFields: t.aliasFields, ExprFields: t.exprFields, IsAggregate: t.isAggregate, AllWildcard: t.allWildcard, WildcardEmitters: t.wildcardEmitters, ExprNames: t.exprNames, SendMeta: t.sendMeta}, nil
  485. }
  486. func parseFunc(props map[string]interface{}) (*operator.FuncOp, error) {
  487. m, ok := props["expr"]
  488. if !ok {
  489. return nil, errors.New("no expr")
  490. }
  491. funcExpr, ok := m.(string)
  492. if !ok {
  493. return nil, fmt.Errorf("expr %v is not string", m)
  494. }
  495. stmt, err := xsql.NewParser(strings.NewReader("select " + funcExpr + " from nonexist")).Parse()
  496. if err != nil {
  497. return nil, err
  498. }
  499. f := stmt.Fields[0]
  500. c, ok := f.Expr.(*ast.Call)
  501. if !ok {
  502. // never happen
  503. return nil, fmt.Errorf("expr %v is not ast.Call", stmt.Fields[0].Expr)
  504. }
  505. var name string
  506. if f.AName != "" {
  507. name = f.AName
  508. } else {
  509. name = f.Name
  510. }
  511. return &operator.FuncOp{CallExpr: c, Name: name, IsAgg: function.IsAggFunc(name)}, nil
  512. }
  513. func parseFilter(props map[string]interface{}) (*operator.FilterOp, error) {
  514. m, ok := props["expr"]
  515. if !ok {
  516. return nil, errors.New("no expr")
  517. }
  518. conditionExpr, ok := m.(string)
  519. if !ok {
  520. return nil, fmt.Errorf("expr %v is not string", m)
  521. }
  522. p := xsql.NewParser(strings.NewReader("where " + conditionExpr))
  523. if exp, err := p.ParseCondition(); err != nil {
  524. return nil, err
  525. } else {
  526. if exp != nil {
  527. return &operator.FilterOp{Condition: exp}, nil
  528. }
  529. }
  530. return nil, fmt.Errorf("expr %v is not a condition", m)
  531. }
  532. func parseHaving(props map[string]interface{}) (*operator.HavingOp, error) {
  533. m, ok := props["expr"]
  534. if !ok {
  535. return nil, errors.New("no expr")
  536. }
  537. conditionExpr, ok := m.(string)
  538. if !ok {
  539. return nil, fmt.Errorf("expr %v is not string", m)
  540. }
  541. p := xsql.NewParser(strings.NewReader("where " + conditionExpr))
  542. if exp, err := p.ParseCondition(); err != nil {
  543. return nil, err
  544. } else {
  545. if exp != nil {
  546. return &operator.HavingOp{Condition: exp}, nil
  547. }
  548. }
  549. return nil, fmt.Errorf("expr %v is not a condition", m)
  550. }
  551. func parseSwitch(props map[string]interface{}) (*node.SwitchConfig, error) {
  552. n := &graph.Switch{}
  553. err := cast.MapToStruct(props, n)
  554. if err != nil {
  555. return nil, err
  556. }
  557. if len(n.Cases) == 0 {
  558. return nil, fmt.Errorf("switch node must have at least one case")
  559. }
  560. caseExprs := make([]ast.Expr, len(n.Cases))
  561. for i, c := range n.Cases {
  562. p := xsql.NewParser(strings.NewReader("where " + c))
  563. if exp, err := p.ParseCondition(); err != nil {
  564. return nil, fmt.Errorf("parse case %d error: %v", i, err)
  565. } else {
  566. if exp != nil {
  567. caseExprs[i] = exp
  568. }
  569. }
  570. }
  571. return &node.SwitchConfig{
  572. Cases: caseExprs,
  573. StopAtFirstMatch: n.StopAtFirstMatch,
  574. }, nil
  575. }