planner_graph.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "errors"
  17. "fmt"
  18. "strings"
  19. "github.com/lf-edge/ekuiper/internal/binder/function"
  20. store2 "github.com/lf-edge/ekuiper/internal/pkg/store"
  21. "github.com/lf-edge/ekuiper/internal/topo"
  22. "github.com/lf-edge/ekuiper/internal/topo/graph"
  23. "github.com/lf-edge/ekuiper/internal/topo/node"
  24. "github.com/lf-edge/ekuiper/internal/topo/operator"
  25. "github.com/lf-edge/ekuiper/internal/xsql"
  26. "github.com/lf-edge/ekuiper/pkg/api"
  27. "github.com/lf-edge/ekuiper/pkg/ast"
  28. "github.com/lf-edge/ekuiper/pkg/cast"
  29. "github.com/lf-edge/ekuiper/pkg/kv"
  30. "github.com/lf-edge/ekuiper/pkg/message"
  31. )
  32. type genNodeFunc func(name string, props map[string]interface{}, options *api.RuleOption) (api.TopNode, error)
  33. var extNodes = map[string]genNodeFunc{}
  34. type sourceType int
  35. const (
  36. ILLEGAL sourceType = iota
  37. STREAM
  38. SCANTABLE
  39. LOOKUPTABLE
  40. )
  41. // PlanByGraph returns a topo.Topo object by a graph
  42. func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
  43. ruleGraph := rule.Graph
  44. if ruleGraph == nil {
  45. return nil, errors.New("no graph")
  46. }
  47. tp, err := topo.NewWithNameAndQos(rule.Id, rule.Options.Qos, rule.Options.CheckpointInterval)
  48. if err != nil {
  49. return nil, err
  50. }
  51. var (
  52. nodeMap = make(map[string]api.TopNode)
  53. sinks = make(map[string]bool)
  54. sources = make(map[string]bool)
  55. store kv.KeyValue
  56. lookupTableChildren = make(map[string]*ast.Options)
  57. scanTableEmitters []string
  58. sourceNames []string
  59. streamEmitters = make(map[string]struct{})
  60. )
  61. for _, srcName := range ruleGraph.Topo.Sources {
  62. gn, ok := ruleGraph.Nodes[srcName]
  63. if !ok {
  64. return nil, fmt.Errorf("source node %s not defined", srcName)
  65. }
  66. if _, ok := ruleGraph.Topo.Edges[srcName]; !ok {
  67. return nil, fmt.Errorf("no edge defined for source node %s", srcName)
  68. }
  69. srcNode, srcType, name, err := parseSource(srcName, gn, rule, store, lookupTableChildren)
  70. if err != nil {
  71. return nil, fmt.Errorf("parse source %s with %v error: %w", srcName, gn.Props, err)
  72. }
  73. switch srcType {
  74. case STREAM:
  75. streamEmitters[name] = struct{}{}
  76. sourceNames = append(sourceNames, name)
  77. case SCANTABLE:
  78. scanTableEmitters = append(scanTableEmitters, name)
  79. sourceNames = append(sourceNames, name)
  80. case LOOKUPTABLE:
  81. sourceNames = append(sourceNames, name)
  82. }
  83. if srcNode != nil {
  84. nodeMap[srcName] = srcNode
  85. tp.AddSrc(srcNode)
  86. }
  87. sources[srcName] = true
  88. }
  89. for nodeName, gn := range ruleGraph.Nodes {
  90. switch gn.Type {
  91. case "source": // handled above,
  92. continue
  93. case "sink":
  94. if _, ok := ruleGraph.Topo.Edges[nodeName]; ok {
  95. return nil, fmt.Errorf("sink %s has edge", nodeName)
  96. }
  97. nodeMap[nodeName] = node.NewSinkNode(nodeName, gn.NodeType, gn.Props)
  98. sinks[nodeName] = true
  99. case "operator":
  100. if _, ok := ruleGraph.Topo.Edges[nodeName]; !ok {
  101. return nil, fmt.Errorf("no edge defined for operator node %s", nodeName)
  102. }
  103. nt := strings.ToLower(gn.NodeType)
  104. switch nt {
  105. case "function":
  106. fop, err := parseFunc(gn.Props, sourceNames)
  107. if err != nil {
  108. return nil, fmt.Errorf("parse function %s with %v error: %w", nodeName, gn.Props, err)
  109. }
  110. op := Transform(fop, nodeName, rule.Options)
  111. nodeMap[nodeName] = op
  112. case "aggfunc":
  113. fop, err := parseFunc(gn.Props, sourceNames)
  114. if err != nil {
  115. return nil, fmt.Errorf("parse aggfunc %s with %v error: %w", nodeName, gn.Props, err)
  116. }
  117. fop.IsAgg = true
  118. op := Transform(fop, nodeName, rule.Options)
  119. nodeMap[nodeName] = op
  120. case "filter":
  121. fop, err := parseFilter(gn.Props, sourceNames)
  122. if err != nil {
  123. return nil, fmt.Errorf("parse filter %s with %v error: %w", nodeName, gn.Props, err)
  124. }
  125. op := Transform(fop, nodeName, rule.Options)
  126. nodeMap[nodeName] = op
  127. case "pick":
  128. pop, err := parsePick(gn.Props, sourceNames)
  129. if err != nil {
  130. return nil, fmt.Errorf("parse pick %s with %v error: %w", nodeName, gn.Props, err)
  131. }
  132. op := Transform(pop, nodeName, rule.Options)
  133. nodeMap[nodeName] = op
  134. case "window":
  135. wconf, err := parseWindow(gn.Props)
  136. if err != nil {
  137. return nil, fmt.Errorf("parse window conf %s with %v error: %w", nodeName, gn.Props, err)
  138. }
  139. op, err := node.NewWindowOp(nodeName, *wconf, ruleGraph.Topo.Sources, rule.Options)
  140. if err != nil {
  141. return nil, fmt.Errorf("parse window %s with %v error: %w", nodeName, gn.Props, err)
  142. }
  143. nodeMap[nodeName] = op
  144. case "join":
  145. stmt, err := parseJoinAst(gn.Props, sourceNames)
  146. if err != nil {
  147. return nil, fmt.Errorf("parse join %s with %v error: %w", nodeName, gn.Props, err)
  148. }
  149. fromNode := stmt.Sources[0].(*ast.Table)
  150. if _, ok := streamEmitters[fromNode.Name]; !ok {
  151. return nil, fmt.Errorf("parse join %s with %v error: join source %s is not a stream", nodeName, gn.Props, fromNode.Name)
  152. }
  153. hasLookup := false
  154. if stmt.Joins != nil {
  155. if len(lookupTableChildren) > 0 {
  156. var joins []ast.Join
  157. for _, join := range stmt.Joins {
  158. if hasLookup {
  159. return nil, fmt.Errorf("parse join %s with %v error: only support to join one lookup table with one stream", nodeName, gn.Props)
  160. }
  161. if streamOpt, ok := lookupTableChildren[join.Name]; ok {
  162. hasLookup = true
  163. lookupPlan := LookupPlan{
  164. joinExpr: join,
  165. options: streamOpt,
  166. }
  167. if !lookupPlan.validateAndExtractCondition() {
  168. return nil, fmt.Errorf("parse join %s with %v error: join condition %s is invalid, at least one equi-join predicate is required", nodeName, gn.Props, join.Expr)
  169. }
  170. op, err := node.NewLookupNode(lookupPlan.joinExpr.Name, lookupPlan.fields, lookupPlan.keys, lookupPlan.joinExpr.JoinType, lookupPlan.valvars, lookupPlan.options, rule.Options)
  171. if err != nil {
  172. return nil, fmt.Errorf("parse join %s with %v error: fail to create lookup node", nodeName, gn.Props)
  173. }
  174. nodeMap[nodeName] = op
  175. } else {
  176. joins = append(joins, join)
  177. }
  178. }
  179. stmt.Joins = joins
  180. }
  181. // Not all joins are lookup joins, so we need to create a join plan for the remaining joins
  182. if len(stmt.Joins) > 0 && !hasLookup {
  183. if len(scanTableEmitters) > 0 {
  184. return nil, fmt.Errorf("parse join %s with %v error: do not support scan table %s yet", nodeName, gn.Props, scanTableEmitters)
  185. }
  186. jop := &operator.JoinOp{Joins: stmt.Joins, From: fromNode}
  187. op := Transform(jop, nodeName, rule.Options)
  188. nodeMap[nodeName] = op
  189. }
  190. }
  191. case "groupby":
  192. gop, err := parseGroupBy(gn.Props, sourceNames)
  193. if err != nil {
  194. return nil, fmt.Errorf("parse groupby %s with %v error: %w", nodeName, gn.Props, err)
  195. }
  196. op := Transform(gop, nodeName, rule.Options)
  197. nodeMap[nodeName] = op
  198. case "orderby":
  199. oop, err := parseOrderBy(gn.Props, sourceNames)
  200. if err != nil {
  201. return nil, fmt.Errorf("parse orderby %s with %v error: %w", nodeName, gn.Props, err)
  202. }
  203. op := Transform(oop, nodeName, rule.Options)
  204. nodeMap[nodeName] = op
  205. case "switch":
  206. sconf, err := parseSwitch(gn.Props, sourceNames)
  207. if err != nil {
  208. return nil, fmt.Errorf("parse switch %s with %v error: %w", nodeName, gn.Props, err)
  209. }
  210. op, err := node.NewSwitchNode(nodeName, sconf, rule.Options)
  211. if err != nil {
  212. return nil, fmt.Errorf("create switch %s with %v error: %w", nodeName, gn.Props, err)
  213. }
  214. nodeMap[nodeName] = op
  215. default:
  216. gnf, ok := extNodes[nt]
  217. if !ok {
  218. return nil, fmt.Errorf("unknown operator type %s", gn.NodeType)
  219. }
  220. op, err := gnf(nodeName, gn.Props, rule.Options)
  221. if err != nil {
  222. return nil, err
  223. }
  224. nodeMap[nodeName] = op
  225. }
  226. default:
  227. return nil, fmt.Errorf("unknown node type %s", gn.Type)
  228. }
  229. }
  230. // validate source node
  231. for _, nodeName := range ruleGraph.Topo.Sources {
  232. if _, ok := sources[nodeName]; !ok {
  233. return nil, fmt.Errorf("source %s is not a source type node", nodeName)
  234. }
  235. }
  236. // reverse edges, value is a 2-dim array. Only switch node will have the second dim
  237. reversedEdges := make(map[string][][]string)
  238. rclone := make(map[string][]string)
  239. for fromNode, toNodes := range ruleGraph.Topo.Edges {
  240. if _, ok := ruleGraph.Nodes[fromNode]; !ok {
  241. return nil, fmt.Errorf("node %s is not defined", fromNode)
  242. }
  243. for i, toNode := range toNodes {
  244. switch tn := toNode.(type) {
  245. case string:
  246. if _, ok := ruleGraph.Nodes[tn]; !ok {
  247. return nil, fmt.Errorf("node %s is not defined", tn)
  248. }
  249. if _, ok := reversedEdges[tn]; !ok {
  250. reversedEdges[tn] = make([][]string, 1)
  251. }
  252. reversedEdges[tn][0] = append(reversedEdges[tn][0], fromNode)
  253. rclone[tn] = append(rclone[tn], fromNode)
  254. case []interface{}:
  255. for _, tni := range tn {
  256. tnn, ok := tni.(string)
  257. if !ok { // never happen
  258. return nil, fmt.Errorf("invalid edge toNode %v", toNode)
  259. }
  260. if _, ok := ruleGraph.Nodes[tnn]; !ok {
  261. return nil, fmt.Errorf("node %s is not defined", tnn)
  262. }
  263. for len(reversedEdges[tnn]) <= i {
  264. reversedEdges[tnn] = append(reversedEdges[tnn], []string{})
  265. }
  266. reversedEdges[tnn][i] = append(reversedEdges[tnn][i], fromNode)
  267. rclone[tnn] = append(rclone[tnn], fromNode)
  268. }
  269. }
  270. }
  271. }
  272. // sort the nodes by topological order
  273. nodesInOrder := make([]string, len(ruleGraph.Nodes))
  274. i := 0
  275. genNodesInOrder(ruleGraph.Topo.Sources, ruleGraph.Topo.Edges, rclone, nodesInOrder, i)
  276. // validate the typo
  277. // the map is to record the output for each node
  278. dataFlow := make(map[string]*graph.IOType)
  279. for _, n := range nodesInOrder {
  280. gn := ruleGraph.Nodes[n]
  281. if gn == nil {
  282. return nil, fmt.Errorf("can't find node %s", n)
  283. }
  284. if gn.Type == "source" {
  285. dataFlow[n] = &graph.IOType{
  286. Type: graph.IOINPUT_TYPE_ROW,
  287. RowType: graph.IOROW_TYPE_SINGLE,
  288. CollectionType: graph.IOCOLLECTION_TYPE_ANY,
  289. AllowMulti: false,
  290. }
  291. } else if gn.Type == "sink" {
  292. continue
  293. } else {
  294. nodeIO, ok := graph.OpIO[strings.ToLower(gn.NodeType)]
  295. if !ok {
  296. return nil, fmt.Errorf("can't find the io definition for node type %s", gn.NodeType)
  297. }
  298. dataInCondition := nodeIO[0]
  299. indim := reversedEdges[n]
  300. var innodes []string
  301. for _, in := range indim {
  302. innodes = append(innodes, in...)
  303. }
  304. if len(innodes) > 1 {
  305. if dataInCondition.AllowMulti {
  306. // special case for join which does not allow multiple streams
  307. if gn.NodeType == "join" {
  308. joinStreams := 0
  309. for _, innode := range innodes {
  310. if _, isLookup := lookupTableChildren[innode]; !isLookup {
  311. joinStreams++
  312. }
  313. if joinStreams > 1 {
  314. return nil, fmt.Errorf("join node %s does not allow multiple stream inputs", n)
  315. }
  316. }
  317. }
  318. for _, innode := range innodes {
  319. _, err = graph.Fit(dataFlow[innode], dataInCondition)
  320. if err != nil {
  321. return nil, fmt.Errorf("node %s output does not match node %s input: %v", innode, n, err)
  322. }
  323. }
  324. } else {
  325. return nil, fmt.Errorf("operator %s of type %s does not allow multiple inputs", n, gn.NodeType)
  326. }
  327. } else if len(innodes) == 1 {
  328. _, err := graph.Fit(dataFlow[innodes[0]], dataInCondition)
  329. if err != nil {
  330. return nil, fmt.Errorf("node %s output does not match node %s input: %v", innodes[0], n, err)
  331. }
  332. } else {
  333. return nil, fmt.Errorf("operator %s of type %s has no input", n, gn.NodeType)
  334. }
  335. out := nodeIO[1]
  336. in := dataFlow[innodes[0]]
  337. dataFlow[n] = graph.MapOut(in, out)
  338. // convert filter to having if the input is aggregated
  339. if gn.NodeType == "filter" && in.Type == graph.IOINPUT_TYPE_COLLECTION && in.CollectionType == graph.IOCOLLECTION_TYPE_GROUPED {
  340. fop, err := parseHaving(gn.Props, sourceNames)
  341. if err != nil {
  342. return nil, err
  343. }
  344. op := Transform(fop, n, rule.Options)
  345. nodeMap[n] = op
  346. }
  347. }
  348. }
  349. // add the linkages
  350. for nodeName, fromNodes := range reversedEdges {
  351. totalLen := 0
  352. for _, fromNode := range fromNodes {
  353. totalLen += len(fromNode)
  354. }
  355. inputs := make([]api.Emitter, 0, totalLen)
  356. for i, fromNode := range fromNodes {
  357. for _, from := range fromNode {
  358. if i == 0 {
  359. if src, ok := nodeMap[from].(api.Emitter); ok {
  360. inputs = append(inputs, src)
  361. }
  362. } else {
  363. switch sn := nodeMap[from].(type) {
  364. case *node.SwitchNode:
  365. inputs = append(inputs, sn.GetEmitter(i))
  366. default:
  367. return nil, fmt.Errorf("node %s is not a switch node but have multiple output", from)
  368. }
  369. }
  370. }
  371. }
  372. n := nodeMap[nodeName]
  373. if n == nil {
  374. return nil, fmt.Errorf("node %s is not defined", nodeName)
  375. }
  376. if _, ok := sinks[nodeName]; ok {
  377. tp.AddSink(inputs, n.(*node.SinkNode))
  378. } else {
  379. tp.AddOperator(inputs, n.(node.OperatorNode))
  380. }
  381. }
  382. return tp, nil
  383. }
  384. func genNodesInOrder(toNodes []string, edges map[string][]interface{}, flatReversedEdges map[string][]string, nodesInOrder []string, i int) int {
  385. for _, src := range toNodes {
  386. if len(flatReversedEdges[src]) > 1 {
  387. flatReversedEdges[src] = flatReversedEdges[src][1:]
  388. continue
  389. }
  390. nodesInOrder[i] = src
  391. i++
  392. tns := make([]string, 0, len(edges[src]))
  393. for _, toNode := range edges[src] {
  394. switch toNode.(type) {
  395. case string:
  396. tns = append(tns, toNode.(string))
  397. case []interface{}:
  398. for _, tni := range toNode.([]interface{}) {
  399. tns = append(tns, tni.(string))
  400. }
  401. }
  402. }
  403. i = genNodesInOrder(tns, edges, flatReversedEdges, nodesInOrder, i)
  404. }
  405. return i
  406. }
  407. func parseSource(nodeName string, gn *api.GraphNode, rule *api.Rule, store kv.KeyValue, lookupTableChildren map[string]*ast.Options) (*node.SourceNode, sourceType, string, error) {
  408. sourceMeta := &api.SourceMeta{
  409. SourceType: "stream",
  410. }
  411. err := cast.MapToStruct(gn.Props, sourceMeta)
  412. if err != nil {
  413. return nil, ILLEGAL, "", err
  414. }
  415. if sourceMeta.SourceType != "stream" && sourceMeta.SourceType != "table" {
  416. return nil, ILLEGAL, "", fmt.Errorf("source type %s not supported", sourceMeta.SourceType)
  417. }
  418. // If source name is specified, find the created stream/table from store
  419. if sourceMeta.SourceName != "" {
  420. if store == nil {
  421. store, err = store2.GetKV("stream")
  422. if err != nil {
  423. return nil, ILLEGAL, "", err
  424. }
  425. }
  426. streamStmt, e := xsql.GetDataSource(store, sourceMeta.SourceName)
  427. if e != nil {
  428. return nil, ILLEGAL, "", fmt.Errorf("fail to get stream %s, please check if stream is created", sourceMeta.SourceName)
  429. }
  430. if streamStmt.StreamType == ast.TypeStream && sourceMeta.SourceType == "table" {
  431. return nil, ILLEGAL, "", fmt.Errorf("stream %s is not a table", sourceMeta.SourceName)
  432. } else if streamStmt.StreamType == ast.TypeTable && sourceMeta.SourceType == "stream" {
  433. return nil, ILLEGAL, "", fmt.Errorf("table %s is not a stream", sourceMeta.SourceName)
  434. }
  435. st := streamStmt.Options.TYPE
  436. if st == "" {
  437. st = "mqtt"
  438. }
  439. if st != gn.NodeType {
  440. return nil, ILLEGAL, "", fmt.Errorf("source type %s does not match the stream type %s", gn.NodeType, st)
  441. }
  442. sInfo, err := convertStreamInfo(streamStmt)
  443. if err != nil {
  444. return nil, ILLEGAL, "", err
  445. }
  446. if sInfo.stmt.StreamType == ast.TypeTable && sInfo.stmt.Options.KIND == ast.StreamKindLookup {
  447. lookupTableChildren[string(sInfo.stmt.Name)] = sInfo.stmt.Options
  448. return nil, LOOKUPTABLE, string(sInfo.stmt.Name), nil
  449. } else {
  450. // Use the plan to calculate the schema and other meta info
  451. p := DataSourcePlan{
  452. name: sInfo.stmt.Name,
  453. streamStmt: sInfo.stmt,
  454. streamFields: sInfo.schema.ToJsonSchema(),
  455. isSchemaless: sInfo.schema == nil,
  456. iet: rule.Options.IsEventTime,
  457. allMeta: rule.Options.SendMetaToSink,
  458. }.Init()
  459. if sInfo.stmt.StreamType == ast.TypeStream {
  460. err = p.PruneColumns(nil)
  461. if err != nil {
  462. return nil, ILLEGAL, "", err
  463. }
  464. srcNode, e := transformSourceNode(p, nil, rule.Options)
  465. if e != nil {
  466. return nil, ILLEGAL, "", e
  467. }
  468. return srcNode, STREAM, string(sInfo.stmt.Name), nil
  469. } else {
  470. return nil, SCANTABLE, string(sInfo.stmt.Name), nil
  471. }
  472. }
  473. } else {
  474. sourceOption := &ast.Options{}
  475. err = cast.MapToStruct(gn.Props, sourceOption)
  476. if err != nil {
  477. return nil, ILLEGAL, "", err
  478. }
  479. sourceOption.TYPE = gn.NodeType
  480. switch sourceMeta.SourceType {
  481. case "stream":
  482. pp, err := operator.NewPreprocessor(true, nil, true, nil, rule.Options.IsEventTime, sourceOption.TIMESTAMP, sourceOption.TIMESTAMP_FORMAT, strings.EqualFold(sourceOption.FORMAT, message.FormatBinary), sourceOption.STRICT_VALIDATION)
  483. if err != nil {
  484. return nil, ILLEGAL, "", err
  485. }
  486. srcNode := node.NewSourceNode(nodeName, ast.TypeStream, pp, sourceOption, rule.Options.SendError)
  487. return srcNode, STREAM, nodeName, nil
  488. case "table":
  489. return nil, ILLEGAL, "", fmt.Errorf("anonymouse table source is not supported, please create it prior to the rule")
  490. }
  491. }
  492. return nil, ILLEGAL, "", errors.New("invalid source node")
  493. }
  494. func parseOrderBy(props map[string]interface{}, sourceNames []string) (*operator.OrderOp, error) {
  495. n := &graph.Orderby{}
  496. err := cast.MapToStruct(props, n)
  497. if err != nil {
  498. return nil, err
  499. }
  500. stmt := "SELECT * FROM unknown ORDER BY"
  501. for _, s := range n.Sorts {
  502. stmt += " " + s.Field + " "
  503. if s.Desc {
  504. stmt += "DESC"
  505. }
  506. }
  507. p, err := xsql.NewParserWithSources(strings.NewReader(stmt), sourceNames).Parse()
  508. if err != nil {
  509. return nil, fmt.Errorf("invalid order by statement error: %v", err)
  510. }
  511. if len(p.SortFields) == 0 {
  512. return nil, fmt.Errorf("order by statement is empty")
  513. }
  514. return &operator.OrderOp{
  515. SortFields: p.SortFields,
  516. }, nil
  517. }
  518. func parseGroupBy(props map[string]interface{}, sourceNames []string) (*operator.AggregateOp, error) {
  519. n := &graph.Groupby{}
  520. err := cast.MapToStruct(props, n)
  521. if err != nil {
  522. return nil, err
  523. }
  524. if len(n.Dimensions) == 0 {
  525. return nil, fmt.Errorf("groupby must have at least one dimension")
  526. }
  527. stmt := "SELECT * FROM unknown Group By " + strings.Join(n.Dimensions, ",")
  528. p, err := xsql.NewParserWithSources(strings.NewReader(stmt), sourceNames).Parse()
  529. if err != nil {
  530. return nil, fmt.Errorf("invalid join statement error: %v", err)
  531. }
  532. return &operator.AggregateOp{Dimensions: p.Dimensions}, nil
  533. }
  534. func parseJoinAst(props map[string]interface{}, sourceNames []string) (*ast.SelectStatement, error) {
  535. n := &graph.Join{}
  536. err := cast.MapToStruct(props, n)
  537. if err != nil {
  538. return nil, err
  539. }
  540. stmt := "SELECT * FROM " + n.From
  541. for _, join := range n.Joins {
  542. stmt += " " + join.Type + " JOIN " + join.Name + " ON " + join.On
  543. }
  544. return xsql.NewParserWithSources(strings.NewReader(stmt), sourceNames).Parse()
  545. }
  546. func parseWindow(props map[string]interface{}) (*node.WindowConfig, error) {
  547. n := &graph.Window{}
  548. err := cast.MapToStruct(props, n)
  549. if err != nil {
  550. return nil, err
  551. }
  552. if n.Size <= 0 {
  553. return nil, fmt.Errorf("window size %d is invalid", n.Size)
  554. }
  555. var (
  556. wt ast.WindowType
  557. length int
  558. interval int
  559. )
  560. switch strings.ToLower(n.Type) {
  561. case "tumblingwindow":
  562. wt = ast.TUMBLING_WINDOW
  563. if n.Interval != 0 && n.Interval != n.Size {
  564. return nil, fmt.Errorf("tumbling window interval must equal to size")
  565. }
  566. case "hoppingwindow":
  567. wt = ast.HOPPING_WINDOW
  568. if n.Interval <= 0 {
  569. return nil, fmt.Errorf("hopping window interval must be greater than 0")
  570. }
  571. if n.Interval > n.Size {
  572. return nil, fmt.Errorf("hopping window interval must be less than size")
  573. }
  574. case "sessionwindow":
  575. wt = ast.SESSION_WINDOW
  576. if n.Interval <= 0 {
  577. return nil, fmt.Errorf("hopping window interval must be greater than 0")
  578. }
  579. case "slidingwindow":
  580. wt = ast.SLIDING_WINDOW
  581. if n.Interval != 0 && n.Interval != n.Size {
  582. return nil, fmt.Errorf("tumbling window interval must equal to size")
  583. }
  584. case "countwindow":
  585. wt = ast.COUNT_WINDOW
  586. if n.Interval < 0 {
  587. return nil, fmt.Errorf("count window interval must be greater or equal to 0")
  588. }
  589. if n.Interval > n.Size {
  590. return nil, fmt.Errorf("count window interval must be less than size")
  591. }
  592. if n.Interval == 0 {
  593. n.Interval = n.Size
  594. }
  595. default:
  596. return nil, fmt.Errorf("unknown window type %s", n.Type)
  597. }
  598. if wt == ast.COUNT_WINDOW {
  599. length = n.Size
  600. interval = n.Interval
  601. } else {
  602. unit := 1
  603. switch strings.ToLower(n.Unit) {
  604. case "dd":
  605. unit = 24 * 3600 * 1000
  606. case "hh":
  607. unit = 3600 * 1000
  608. case "mi":
  609. unit = 60 * 1000
  610. case "ss":
  611. unit = 1000
  612. case "ms":
  613. unit = 1
  614. default:
  615. return nil, fmt.Errorf("Invalid unit %s", n.Unit)
  616. }
  617. length = n.Size * unit
  618. interval = n.Interval * unit
  619. }
  620. return &node.WindowConfig{
  621. Type: wt,
  622. Length: length,
  623. Interval: interval,
  624. }, nil
  625. }
  626. func parsePick(props map[string]interface{}, sourceNames []string) (*operator.ProjectOp, error) {
  627. n := &graph.Select{}
  628. err := cast.MapToStruct(props, n)
  629. if err != nil {
  630. return nil, err
  631. }
  632. stmt, err := xsql.NewParserWithSources(strings.NewReader("select "+strings.Join(n.Fields, ",")+" from nonexist"), sourceNames).Parse()
  633. if err != nil {
  634. return nil, err
  635. }
  636. t := ProjectPlan{
  637. fields: stmt.Fields,
  638. isAggregate: xsql.IsAggStatement(stmt),
  639. }.Init()
  640. return &operator.ProjectOp{ColNames: t.colNames, AliasNames: t.aliasNames, AliasFields: t.aliasFields, ExprFields: t.exprFields, IsAggregate: t.isAggregate, AllWildcard: t.allWildcard, WildcardEmitters: t.wildcardEmitters, ExprNames: t.exprNames, SendMeta: t.sendMeta}, nil
  641. }
  642. func parseFunc(props map[string]interface{}, sourceNames []string) (*operator.FuncOp, error) {
  643. m, ok := props["expr"]
  644. if !ok {
  645. return nil, errors.New("no expr")
  646. }
  647. funcExpr, ok := m.(string)
  648. if !ok {
  649. return nil, fmt.Errorf("expr %v is not string", m)
  650. }
  651. stmt, err := xsql.NewParserWithSources(strings.NewReader("select "+funcExpr+" from nonexist"), sourceNames).Parse()
  652. if err != nil {
  653. return nil, err
  654. }
  655. f := stmt.Fields[0]
  656. c, ok := f.Expr.(*ast.Call)
  657. if !ok {
  658. // never happen
  659. return nil, fmt.Errorf("expr %s is not ast.Call", funcExpr)
  660. }
  661. var name string
  662. if f.AName != "" {
  663. name = f.AName
  664. } else {
  665. name = f.Name
  666. }
  667. return &operator.FuncOp{CallExpr: c, Name: name, IsAgg: function.IsAggFunc(name)}, nil
  668. }
  669. func parseFilter(props map[string]interface{}, sourceNames []string) (*operator.FilterOp, error) {
  670. m, ok := props["expr"]
  671. if !ok {
  672. return nil, errors.New("no expr")
  673. }
  674. conditionExpr, ok := m.(string)
  675. if !ok {
  676. return nil, fmt.Errorf("expr %v is not string", m)
  677. }
  678. p := xsql.NewParserWithSources(strings.NewReader("where "+conditionExpr), sourceNames)
  679. if exp, err := p.ParseCondition(); err != nil {
  680. return nil, err
  681. } else {
  682. if exp != nil {
  683. return &operator.FilterOp{Condition: exp}, nil
  684. }
  685. }
  686. return nil, fmt.Errorf("expr %v is not a condition", m)
  687. }
  688. func parseHaving(props map[string]interface{}, sourceNames []string) (*operator.HavingOp, error) {
  689. m, ok := props["expr"]
  690. if !ok {
  691. return nil, errors.New("no expr")
  692. }
  693. conditionExpr, ok := m.(string)
  694. if !ok {
  695. return nil, fmt.Errorf("expr %v is not string", m)
  696. }
  697. p := xsql.NewParserWithSources(strings.NewReader("where "+conditionExpr), sourceNames)
  698. if exp, err := p.ParseCondition(); err != nil {
  699. return nil, err
  700. } else {
  701. if exp != nil {
  702. return &operator.HavingOp{Condition: exp}, nil
  703. }
  704. }
  705. return nil, fmt.Errorf("expr %v is not a condition", m)
  706. }
  707. func parseSwitch(props map[string]interface{}, sourceNames []string) (*node.SwitchConfig, error) {
  708. n := &graph.Switch{}
  709. err := cast.MapToStruct(props, n)
  710. if err != nil {
  711. return nil, err
  712. }
  713. if len(n.Cases) == 0 {
  714. return nil, fmt.Errorf("switch node must have at least one case")
  715. }
  716. caseExprs := make([]ast.Expr, len(n.Cases))
  717. for i, c := range n.Cases {
  718. p := xsql.NewParserWithSources(strings.NewReader("where "+c), sourceNames)
  719. if exp, err := p.ParseCondition(); err != nil {
  720. return nil, fmt.Errorf("parse case %d error: %v", i, err)
  721. } else {
  722. if exp != nil {
  723. caseExprs[i] = exp
  724. }
  725. }
  726. }
  727. return &node.SwitchConfig{
  728. Cases: caseExprs,
  729. StopAtFirstMatch: n.StopAtFirstMatch,
  730. }, nil
  731. }