planner_graph.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "errors"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/binder/function"
  19. "github.com/lf-edge/ekuiper/internal/topo"
  20. "github.com/lf-edge/ekuiper/internal/topo/graph"
  21. "github.com/lf-edge/ekuiper/internal/topo/node"
  22. "github.com/lf-edge/ekuiper/internal/topo/operator"
  23. "github.com/lf-edge/ekuiper/internal/xsql"
  24. "github.com/lf-edge/ekuiper/pkg/api"
  25. "github.com/lf-edge/ekuiper/pkg/ast"
  26. "github.com/lf-edge/ekuiper/pkg/cast"
  27. "github.com/lf-edge/ekuiper/pkg/message"
  28. "strings"
  29. )
  30. // PlanByGraph returns a topo.Topo object by a graph
  31. func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
  32. ruleGraph := rule.Graph
  33. if ruleGraph == nil {
  34. return nil, errors.New("no graph")
  35. }
  36. tp, err := topo.NewWithNameAndQos(rule.Id, rule.Options.Qos, rule.Options.CheckpointInterval)
  37. if err != nil {
  38. return nil, err
  39. }
  40. var (
  41. nodeMap = make(map[string]api.TopNode)
  42. sinks = make(map[string]bool)
  43. sources = make(map[string]bool)
  44. )
  45. for nodeName, gn := range ruleGraph.Nodes {
  46. switch gn.Type {
  47. case "source":
  48. if _, ok := ruleGraph.Topo.Edges[nodeName]; !ok {
  49. return nil, fmt.Errorf("no edge defined for source node %s", nodeName)
  50. }
  51. sourceType, ok := gn.Props["source_type"]
  52. if !ok {
  53. sourceType = "stream"
  54. }
  55. st, ok := sourceType.(string)
  56. if !ok {
  57. return nil, fmt.Errorf("source_type %v is not string", sourceType)
  58. }
  59. st = strings.ToLower(st)
  60. sourceOption := &ast.Options{}
  61. err := cast.MapToStruct(gn.Props, sourceOption)
  62. if err != nil {
  63. return nil, err
  64. }
  65. sourceOption.TYPE = gn.NodeType
  66. switch st {
  67. case "stream":
  68. // TODO deal with conf key
  69. pp, err := operator.NewPreprocessor(true, nil, true, nil, rule.Options.IsEventTime, sourceOption.TIMESTAMP, sourceOption.TIMESTAMP_FORMAT, strings.EqualFold(sourceOption.FORMAT, message.FormatBinary), sourceOption.STRICT_VALIDATION)
  70. if err != nil {
  71. return nil, err
  72. }
  73. srcNode := node.NewSourceNode(nodeName, ast.TypeStream, pp, sourceOption, rule.Options.SendError)
  74. nodeMap[nodeName] = srcNode
  75. tp.AddSrc(srcNode)
  76. case "table":
  77. // TODO add table
  78. return nil, fmt.Errorf("table source is not supported yet")
  79. default:
  80. return nil, fmt.Errorf("unknown source type %s", st)
  81. }
  82. sources[nodeName] = true
  83. case "sink":
  84. if _, ok := ruleGraph.Topo.Edges[nodeName]; ok {
  85. return nil, fmt.Errorf("sink %s has edge", nodeName)
  86. }
  87. nodeMap[nodeName] = node.NewSinkNode(nodeName, gn.NodeType, gn.Props)
  88. sinks[nodeName] = true
  89. case "operator":
  90. if _, ok := ruleGraph.Topo.Edges[nodeName]; !ok {
  91. return nil, fmt.Errorf("no edge defined for operator node %s", nodeName)
  92. }
  93. switch strings.ToLower(gn.NodeType) {
  94. case "function":
  95. fop, err := parseFunc(gn.Props)
  96. if err != nil {
  97. return nil, err
  98. }
  99. op := Transform(fop, nodeName, rule.Options)
  100. nodeMap[nodeName] = op
  101. case "aggfunc":
  102. fop, err := parseFunc(gn.Props)
  103. if err != nil {
  104. return nil, err
  105. }
  106. fop.IsAgg = true
  107. op := Transform(fop, nodeName, rule.Options)
  108. nodeMap[nodeName] = op
  109. case "filter":
  110. fop, err := parseFilter(gn.Props)
  111. if err != nil {
  112. return nil, err
  113. }
  114. op := Transform(fop, nodeName, rule.Options)
  115. nodeMap[nodeName] = op
  116. case "pick":
  117. pop, err := parsePick(gn.Props)
  118. if err != nil {
  119. return nil, err
  120. }
  121. op := Transform(pop, nodeName, rule.Options)
  122. nodeMap[nodeName] = op
  123. case "window":
  124. wconf, err := parseWindow(gn.Props)
  125. if err != nil {
  126. return nil, err
  127. }
  128. op, err := node.NewWindowOp(nodeName, *wconf, ruleGraph.Topo.Sources, rule.Options)
  129. if err != nil {
  130. return nil, err
  131. }
  132. nodeMap[nodeName] = op
  133. case "join":
  134. jop, err := parseJoin(gn.Props)
  135. if err != nil {
  136. return nil, err
  137. }
  138. op := Transform(jop, nodeName, rule.Options)
  139. nodeMap[nodeName] = op
  140. case "groupby":
  141. gop, err := parseGroupBy(gn.Props)
  142. if err != nil {
  143. return nil, err
  144. }
  145. op := Transform(gop, nodeName, rule.Options)
  146. nodeMap[nodeName] = op
  147. case "orderby":
  148. oop, err := parseOrderBy(gn.Props)
  149. if err != nil {
  150. return nil, err
  151. }
  152. op := Transform(oop, nodeName, rule.Options)
  153. nodeMap[nodeName] = op
  154. default: // TODO other node type
  155. return nil, fmt.Errorf("unknown operator type %s", gn.NodeType)
  156. }
  157. default:
  158. return nil, fmt.Errorf("unknown node type %s", gn.Type)
  159. }
  160. }
  161. // validate source node
  162. for _, nodeName := range ruleGraph.Topo.Sources {
  163. if _, ok := sources[nodeName]; !ok {
  164. return nil, fmt.Errorf("source %s is not a source type node", nodeName)
  165. }
  166. }
  167. // reverse edges
  168. reversedEdges := make(map[string][]string)
  169. rclone := make(map[string][]string)
  170. for fromNode, toNodes := range ruleGraph.Topo.Edges {
  171. if _, ok := ruleGraph.Nodes[fromNode]; !ok {
  172. return nil, fmt.Errorf("node %s is not defined", fromNode)
  173. }
  174. for _, toNode := range toNodes {
  175. if _, ok := ruleGraph.Nodes[toNode]; !ok {
  176. return nil, fmt.Errorf("node %s is not defined", toNode)
  177. }
  178. reversedEdges[toNode] = append(reversedEdges[toNode], fromNode)
  179. rclone[toNode] = append(rclone[toNode], fromNode)
  180. }
  181. }
  182. // sort the nodes by topological order
  183. nodesInOrder := make([]string, len(ruleGraph.Nodes))
  184. i := 0
  185. genNodesInOrder(ruleGraph.Topo.Sources, ruleGraph.Topo.Edges, rclone, nodesInOrder, i)
  186. // validate the typo
  187. // the map is to record the output for each node
  188. dataFlow := make(map[string]*graph.IOType)
  189. for _, n := range nodesInOrder {
  190. gn := ruleGraph.Nodes[n]
  191. if gn == nil {
  192. return nil, fmt.Errorf("can't find node %s", n)
  193. }
  194. if gn.Type == "source" {
  195. dataFlow[n] = &graph.IOType{
  196. Type: graph.IOINPUT_TYPE_ROW,
  197. RowType: graph.IOROW_TYPE_SINGLE,
  198. CollectionType: graph.IOCOLLECTION_TYPE_ANY,
  199. AllowMulti: false,
  200. }
  201. } else if gn.Type == "sink" {
  202. continue
  203. } else {
  204. nodeIO, ok := graph.OpIO[strings.ToLower(gn.NodeType)]
  205. if !ok {
  206. return nil, fmt.Errorf("can't find the io definiton for node type %s", gn.NodeType)
  207. }
  208. dataInCondition := nodeIO[0]
  209. innodes := reversedEdges[n]
  210. if len(innodes) > 1 {
  211. if dataInCondition.AllowMulti {
  212. for _, innode := range innodes {
  213. _, err = graph.Fit(dataFlow[innode], dataInCondition)
  214. if err != nil {
  215. return nil, fmt.Errorf("node %s output does not match node %s input: %v", innode, n, err)
  216. }
  217. }
  218. } else {
  219. return nil, fmt.Errorf("operator %s of type %s does not allow multiple inputs", n, gn.NodeType)
  220. }
  221. } else if len(innodes) == 1 {
  222. _, err := graph.Fit(dataFlow[innodes[0]], dataInCondition)
  223. if err != nil {
  224. return nil, fmt.Errorf("node %s output does not match node %s input: %v", innodes[0], n, err)
  225. }
  226. } else {
  227. return nil, fmt.Errorf("operator %s of type %s has no input", n, gn.NodeType)
  228. }
  229. out := nodeIO[1]
  230. in := dataFlow[innodes[0]]
  231. dataFlow[n] = graph.MapOut(in, out)
  232. // convert filter to having if the input is aggregated
  233. if gn.NodeType == "filter" && in.Type == graph.IOINPUT_TYPE_COLLECTION && in.CollectionType == graph.IOCOLLECTION_TYPE_GROUPED {
  234. fop, err := parseHaving(gn.Props)
  235. if err != nil {
  236. return nil, err
  237. }
  238. op := Transform(fop, n, rule.Options)
  239. nodeMap[n] = op
  240. }
  241. }
  242. }
  243. // add the linkages
  244. for nodeName, fromNodes := range reversedEdges {
  245. inputs := make([]api.Emitter, len(fromNodes))
  246. for i, fromNode := range fromNodes {
  247. inputs[i] = nodeMap[fromNode].(api.Emitter)
  248. }
  249. n := nodeMap[nodeName]
  250. if n == nil {
  251. return nil, fmt.Errorf("node %s is not defined", nodeName)
  252. }
  253. if _, ok := sinks[nodeName]; ok {
  254. tp.AddSink(inputs, n.(*node.SinkNode))
  255. } else {
  256. tp.AddOperator(inputs, n.(node.OperatorNode))
  257. }
  258. }
  259. return tp, nil
  260. }
  261. func genNodesInOrder(toNodes []string, edges map[string][]string, reversedEdges map[string][]string, nodesInOrder []string, i int) int {
  262. for _, src := range toNodes {
  263. if len(reversedEdges[src]) > 1 {
  264. reversedEdges[src] = reversedEdges[src][1:]
  265. continue
  266. }
  267. nodesInOrder[i] = src
  268. i++
  269. i = genNodesInOrder(edges[src], edges, reversedEdges, nodesInOrder, i)
  270. }
  271. return i
  272. }
  273. func parseOrderBy(props map[string]interface{}) (*operator.OrderOp, error) {
  274. n := &graph.Orderby{}
  275. err := cast.MapToStruct(props, n)
  276. if err != nil {
  277. return nil, err
  278. }
  279. stmt := "SELECT * FROM unknown ORDER BY"
  280. for _, s := range n.Sorts {
  281. stmt += " " + s.Field + " "
  282. if s.Desc {
  283. stmt += "DESC"
  284. }
  285. }
  286. p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
  287. if err != nil {
  288. return nil, fmt.Errorf("invalid order by statement error: %v", err)
  289. }
  290. if len(p.SortFields) == 0 {
  291. return nil, fmt.Errorf("order by statement is empty")
  292. }
  293. return &operator.OrderOp{
  294. SortFields: p.SortFields,
  295. }, nil
  296. }
  297. func parseGroupBy(props map[string]interface{}) (*operator.AggregateOp, error) {
  298. n := &graph.Groupby{}
  299. err := cast.MapToStruct(props, n)
  300. if err != nil {
  301. return nil, err
  302. }
  303. if len(n.Dimensions) == 0 {
  304. return nil, fmt.Errorf("groupby must have at least one dimension")
  305. }
  306. stmt := "SELECT * FROM unknown Group By " + strings.Join(n.Dimensions, ",")
  307. p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
  308. if err != nil {
  309. return nil, fmt.Errorf("invalid join statement error: %v", err)
  310. }
  311. return &operator.AggregateOp{Dimensions: p.Dimensions}, nil
  312. }
  313. func parseJoin(props map[string]interface{}) (*operator.JoinOp, error) {
  314. n := &graph.Join{}
  315. err := cast.MapToStruct(props, n)
  316. if err != nil {
  317. return nil, err
  318. }
  319. stmt := "SELECT * FROM " + n.From
  320. for _, join := range n.Joins {
  321. stmt += " " + join.Type + " JOIN ON " + join.On
  322. }
  323. p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
  324. if err != nil {
  325. return nil, fmt.Errorf("invalid join statement error: %v", err)
  326. }
  327. return &operator.JoinOp{Joins: p.Joins, From: p.Sources[0].(*ast.Table)}, nil
  328. }
  329. func parseWindow(props map[string]interface{}) (*node.WindowConfig, error) {
  330. n := &graph.Window{}
  331. err := cast.MapToStruct(props, n)
  332. if err != nil {
  333. return nil, err
  334. }
  335. if n.Size <= 0 {
  336. return nil, fmt.Errorf("window size %d is invalid", n.Size)
  337. }
  338. var (
  339. wt ast.WindowType
  340. length int
  341. interval int
  342. )
  343. switch strings.ToLower(n.Type) {
  344. case "tumblingwindow":
  345. wt = ast.TUMBLING_WINDOW
  346. if n.Interval != 0 && n.Interval != n.Size {
  347. return nil, fmt.Errorf("tumbling window interval must equal to size")
  348. }
  349. case "hoppingwindow":
  350. wt = ast.HOPPING_WINDOW
  351. if n.Interval <= 0 {
  352. return nil, fmt.Errorf("hopping window interval must be greater than 0")
  353. }
  354. if n.Interval > n.Size {
  355. return nil, fmt.Errorf("hopping window interval must be less than size")
  356. }
  357. case "sessionwindow":
  358. wt = ast.SESSION_WINDOW
  359. if n.Interval <= 0 {
  360. return nil, fmt.Errorf("hopping window interval must be greater than 0")
  361. }
  362. case "slidingwindow":
  363. wt = ast.SLIDING_WINDOW
  364. if n.Interval != 0 && n.Interval != n.Size {
  365. return nil, fmt.Errorf("tumbling window interval must equal to size")
  366. }
  367. case "countwindow":
  368. wt = ast.COUNT_WINDOW
  369. if n.Interval < 0 {
  370. return nil, fmt.Errorf("count window interval must be greater or equal to 0")
  371. }
  372. if n.Interval > n.Size {
  373. return nil, fmt.Errorf("count window interval must be less than size")
  374. }
  375. if n.Interval == 0 {
  376. n.Interval = n.Size
  377. }
  378. default:
  379. return nil, fmt.Errorf("unknown window type %s", n.Type)
  380. }
  381. if wt == ast.COUNT_WINDOW {
  382. length = n.Size
  383. interval = n.Interval
  384. } else {
  385. var unit = 1
  386. switch strings.ToLower(n.Unit) {
  387. case "dd":
  388. unit = 24 * 3600 * 1000
  389. case "hh":
  390. unit = 3600 * 1000
  391. case "mi":
  392. unit = 60 * 1000
  393. case "ss":
  394. unit = 1000
  395. case "ms":
  396. unit = 1
  397. default:
  398. return nil, fmt.Errorf("Invalid unit %s", n.Unit)
  399. }
  400. length = n.Size * unit
  401. interval = n.Interval * unit
  402. }
  403. return &node.WindowConfig{
  404. Type: wt,
  405. Length: length,
  406. Interval: interval,
  407. }, nil
  408. }
  409. func parsePick(props map[string]interface{}) (*operator.ProjectOp, error) {
  410. n := &graph.Select{}
  411. err := cast.MapToStruct(props, n)
  412. if err != nil {
  413. return nil, err
  414. }
  415. stmt, err := xsql.NewParser(strings.NewReader("select " + strings.Join(n.Fields, ",") + " from nonexist")).Parse()
  416. if err != nil {
  417. return nil, err
  418. }
  419. t := ProjectPlan{
  420. fields: stmt.Fields,
  421. isAggregate: xsql.IsAggStatement(stmt),
  422. }.Init()
  423. return &operator.ProjectOp{ColNames: t.colNames, AliasNames: t.aliasNames, AliasFields: t.aliasFields, ExprFields: t.exprFields, IsAggregate: t.isAggregate, AllWildcard: t.allWildcard, WildcardEmitters: t.wildcardEmitters, ExprNames: t.exprNames, SendMeta: t.sendMeta}, nil
  424. }
  425. func parseFunc(props map[string]interface{}) (*operator.FuncOp, error) {
  426. m, ok := props["expr"]
  427. if !ok {
  428. return nil, errors.New("no expr")
  429. }
  430. funcExpr, ok := m.(string)
  431. if !ok {
  432. return nil, fmt.Errorf("expr %v is not string", m)
  433. }
  434. stmt, err := xsql.NewParser(strings.NewReader("select " + funcExpr + " from nonexist")).Parse()
  435. if err != nil {
  436. return nil, err
  437. }
  438. f := stmt.Fields[0]
  439. c, ok := f.Expr.(*ast.Call)
  440. if !ok {
  441. // never happen
  442. return nil, fmt.Errorf("expr %v is not ast.Call", stmt.Fields[0].Expr)
  443. }
  444. var name string
  445. if f.AName != "" {
  446. name = f.AName
  447. } else {
  448. name = f.Name
  449. }
  450. return &operator.FuncOp{CallExpr: c, Name: name, IsAgg: function.IsAggFunc(name)}, nil
  451. }
  452. func parseFilter(props map[string]interface{}) (*operator.FilterOp, error) {
  453. m, ok := props["expr"]
  454. if !ok {
  455. return nil, errors.New("no expr")
  456. }
  457. conditionExpr, ok := m.(string)
  458. if !ok {
  459. return nil, fmt.Errorf("expr %v is not string", m)
  460. }
  461. p := xsql.NewParser(strings.NewReader("where " + conditionExpr))
  462. if exp, err := p.ParseCondition(); err != nil {
  463. return nil, err
  464. } else {
  465. if exp != nil {
  466. return &operator.FilterOp{Condition: exp}, nil
  467. }
  468. }
  469. return nil, fmt.Errorf("expr %v is not a condition", m)
  470. }
  471. func parseHaving(props map[string]interface{}) (*operator.HavingOp, error) {
  472. m, ok := props["expr"]
  473. if !ok {
  474. return nil, errors.New("no expr")
  475. }
  476. conditionExpr, ok := m.(string)
  477. if !ok {
  478. return nil, fmt.Errorf("expr %v is not string", m)
  479. }
  480. p := xsql.NewParser(strings.NewReader("where " + conditionExpr))
  481. if exp, err := p.ParseCondition(); err != nil {
  482. return nil, err
  483. } else {
  484. if exp != nil {
  485. return &operator.HavingOp{Condition: exp}, nil
  486. }
  487. }
  488. return nil, fmt.Errorf("expr %v is not a condition", m)
  489. }