planner_graph.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "errors"
  17. "fmt"
  18. "strings"
  19. "github.com/lf-edge/ekuiper/internal/binder/function"
  20. store2 "github.com/lf-edge/ekuiper/internal/pkg/store"
  21. "github.com/lf-edge/ekuiper/internal/topo"
  22. "github.com/lf-edge/ekuiper/internal/topo/graph"
  23. "github.com/lf-edge/ekuiper/internal/topo/node"
  24. "github.com/lf-edge/ekuiper/internal/topo/operator"
  25. "github.com/lf-edge/ekuiper/internal/xsql"
  26. "github.com/lf-edge/ekuiper/pkg/api"
  27. "github.com/lf-edge/ekuiper/pkg/ast"
  28. "github.com/lf-edge/ekuiper/pkg/cast"
  29. "github.com/lf-edge/ekuiper/pkg/kv"
  30. "github.com/lf-edge/ekuiper/pkg/message"
  31. )
  32. type genNodeFunc func(name string, props map[string]interface{}, options *api.RuleOption) (api.TopNode, error)
  33. var extNodes = map[string]genNodeFunc{}
  34. type sourceType int
  35. const (
  36. ILLEGAL sourceType = iota
  37. STREAM
  38. SCANTABLE
  39. LOOKUPTABLE
  40. )
  41. // PlanByGraph returns a topo.Topo object by a graph
  42. func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
  43. ruleGraph := rule.Graph
  44. if ruleGraph == nil {
  45. return nil, errors.New("no graph")
  46. }
  47. tp, err := topo.NewWithNameAndOptions(rule.Id, rule.Options)
  48. if err != nil {
  49. return nil, err
  50. }
  51. var (
  52. nodeMap = make(map[string]api.TopNode)
  53. sinks = make(map[string]bool)
  54. sources = make(map[string]bool)
  55. store kv.KeyValue
  56. lookupTableChildren = make(map[string]*ast.Options)
  57. scanTableEmitters []string
  58. sourceNames []string
  59. streamEmitters = make(map[string]struct{})
  60. )
  61. for _, srcName := range ruleGraph.Topo.Sources {
  62. gn, ok := ruleGraph.Nodes[srcName]
  63. if !ok {
  64. return nil, fmt.Errorf("source node %s not defined", srcName)
  65. }
  66. if _, ok := ruleGraph.Topo.Edges[srcName]; !ok {
  67. return nil, fmt.Errorf("no edge defined for source node %s", srcName)
  68. }
  69. srcNode, srcType, name, err := parseSource(srcName, gn, rule, store, lookupTableChildren)
  70. if err != nil {
  71. return nil, fmt.Errorf("parse source %s with %v error: %w", srcName, gn.Props, err)
  72. }
  73. switch srcType {
  74. case STREAM:
  75. streamEmitters[name] = struct{}{}
  76. sourceNames = append(sourceNames, name)
  77. case SCANTABLE:
  78. scanTableEmitters = append(scanTableEmitters, name)
  79. sourceNames = append(sourceNames, name)
  80. case LOOKUPTABLE:
  81. sourceNames = append(sourceNames, name)
  82. }
  83. if srcNode != nil {
  84. nodeMap[srcName] = srcNode
  85. tp.AddSrc(srcNode)
  86. }
  87. sources[srcName] = true
  88. }
  89. for nodeName, gn := range ruleGraph.Nodes {
  90. switch gn.Type {
  91. case "source": // handled above,
  92. continue
  93. case "sink":
  94. if _, ok := ruleGraph.Topo.Edges[nodeName]; ok {
  95. return nil, fmt.Errorf("sink %s has edge", nodeName)
  96. }
  97. nodeMap[nodeName] = node.NewSinkNode(nodeName, gn.NodeType, gn.Props)
  98. sinks[nodeName] = true
  99. case "operator":
  100. if _, ok := ruleGraph.Topo.Edges[nodeName]; !ok {
  101. return nil, fmt.Errorf("no edge defined for operator node %s", nodeName)
  102. }
  103. nt := strings.ToLower(gn.NodeType)
  104. switch nt {
  105. case "watermark":
  106. n, err := parseWatermark(gn.Props, streamEmitters)
  107. if err != nil {
  108. return nil, fmt.Errorf("parse watermark %s with %v error: %w", nodeName, gn.Props, err)
  109. }
  110. op := node.NewWatermarkOp(nodeName, n.SendWatermark, n.Emitters, rule.Options)
  111. nodeMap[nodeName] = op
  112. case "function":
  113. fop, err := parseFunc(gn.Props, sourceNames)
  114. if err != nil {
  115. return nil, fmt.Errorf("parse function %s with %v error: %w", nodeName, gn.Props, err)
  116. }
  117. op := Transform(fop, nodeName, rule.Options)
  118. nodeMap[nodeName] = op
  119. case "aggfunc":
  120. fop, err := parseFunc(gn.Props, sourceNames)
  121. if err != nil {
  122. return nil, fmt.Errorf("parse aggfunc %s with %v error: %w", nodeName, gn.Props, err)
  123. }
  124. fop.IsAgg = true
  125. op := Transform(fop, nodeName, rule.Options)
  126. nodeMap[nodeName] = op
  127. case "filter":
  128. fop, err := parseFilter(gn.Props, sourceNames)
  129. if err != nil {
  130. return nil, fmt.Errorf("parse filter %s with %v error: %w", nodeName, gn.Props, err)
  131. }
  132. op := Transform(fop, nodeName, rule.Options)
  133. nodeMap[nodeName] = op
  134. case "pick":
  135. pop, err := parsePick(gn.Props, sourceNames)
  136. if err != nil {
  137. return nil, fmt.Errorf("parse pick %s with %v error: %w", nodeName, gn.Props, err)
  138. }
  139. op := Transform(pop, nodeName, rule.Options)
  140. nodeMap[nodeName] = op
  141. case "window":
  142. wconf, err := parseWindow(gn.Props)
  143. if err != nil {
  144. return nil, fmt.Errorf("parse window conf %s with %v error: %w", nodeName, gn.Props, err)
  145. }
  146. op, err := node.NewWindowOp(nodeName, *wconf, rule.Options)
  147. if err != nil {
  148. return nil, fmt.Errorf("parse window %s with %v error: %w", nodeName, gn.Props, err)
  149. }
  150. nodeMap[nodeName] = op
  151. case "join":
  152. stmt, err := parseJoinAst(gn.Props, sourceNames)
  153. if err != nil {
  154. return nil, fmt.Errorf("parse join %s with %v error: %w", nodeName, gn.Props, err)
  155. }
  156. fromNode := stmt.Sources[0].(*ast.Table)
  157. if _, ok := streamEmitters[fromNode.Name]; !ok {
  158. return nil, fmt.Errorf("parse join %s with %v error: join source %s is not a stream", nodeName, gn.Props, fromNode.Name)
  159. }
  160. hasLookup := false
  161. if stmt.Joins != nil {
  162. if len(lookupTableChildren) > 0 {
  163. var joins []ast.Join
  164. for _, join := range stmt.Joins {
  165. if hasLookup {
  166. return nil, fmt.Errorf("parse join %s with %v error: only support to join one lookup table with one stream", nodeName, gn.Props)
  167. }
  168. if streamOpt, ok := lookupTableChildren[join.Name]; ok {
  169. hasLookup = true
  170. lookupPlan := LookupPlan{
  171. joinExpr: join,
  172. options: streamOpt,
  173. }
  174. if !lookupPlan.validateAndExtractCondition() {
  175. return nil, fmt.Errorf("parse join %s with %v error: join condition %s is invalid, at least one equi-join predicate is required", nodeName, gn.Props, join.Expr)
  176. }
  177. op, err := node.NewLookupNode(lookupPlan.joinExpr.Name, lookupPlan.fields, lookupPlan.keys, lookupPlan.joinExpr.JoinType, lookupPlan.valvars, lookupPlan.options, rule.Options)
  178. if err != nil {
  179. return nil, fmt.Errorf("parse join %s with %v error: fail to create lookup node", nodeName, gn.Props)
  180. }
  181. nodeMap[nodeName] = op
  182. } else {
  183. joins = append(joins, join)
  184. }
  185. }
  186. stmt.Joins = joins
  187. }
  188. // Not all joins are lookup joins, so we need to create a join plan for the remaining joins
  189. if len(stmt.Joins) > 0 && !hasLookup {
  190. if len(scanTableEmitters) > 0 {
  191. return nil, fmt.Errorf("parse join %s with %v error: do not support scan table %s yet", nodeName, gn.Props, scanTableEmitters)
  192. }
  193. jop := &operator.JoinOp{Joins: stmt.Joins, From: fromNode}
  194. op := Transform(jop, nodeName, rule.Options)
  195. nodeMap[nodeName] = op
  196. }
  197. }
  198. case "groupby":
  199. gop, err := parseGroupBy(gn.Props, sourceNames)
  200. if err != nil {
  201. return nil, fmt.Errorf("parse groupby %s with %v error: %w", nodeName, gn.Props, err)
  202. }
  203. op := Transform(gop, nodeName, rule.Options)
  204. nodeMap[nodeName] = op
  205. case "orderby":
  206. oop, err := parseOrderBy(gn.Props, sourceNames)
  207. if err != nil {
  208. return nil, fmt.Errorf("parse orderby %s with %v error: %w", nodeName, gn.Props, err)
  209. }
  210. op := Transform(oop, nodeName, rule.Options)
  211. nodeMap[nodeName] = op
  212. case "switch":
  213. sconf, err := parseSwitch(gn.Props, sourceNames)
  214. if err != nil {
  215. return nil, fmt.Errorf("parse switch %s with %v error: %w", nodeName, gn.Props, err)
  216. }
  217. op, err := node.NewSwitchNode(nodeName, sconf, rule.Options)
  218. if err != nil {
  219. return nil, fmt.Errorf("create switch %s with %v error: %w", nodeName, gn.Props, err)
  220. }
  221. nodeMap[nodeName] = op
  222. default:
  223. gnf, ok := extNodes[nt]
  224. if !ok {
  225. return nil, fmt.Errorf("unknown operator type %s", gn.NodeType)
  226. }
  227. op, err := gnf(nodeName, gn.Props, rule.Options)
  228. if err != nil {
  229. return nil, err
  230. }
  231. nodeMap[nodeName] = op
  232. }
  233. default:
  234. return nil, fmt.Errorf("unknown node type %s", gn.Type)
  235. }
  236. }
  237. // validate source node
  238. for _, nodeName := range ruleGraph.Topo.Sources {
  239. if _, ok := sources[nodeName]; !ok {
  240. return nil, fmt.Errorf("source %s is not a source type node", nodeName)
  241. }
  242. }
  243. // reverse edges, value is a 2-dim array. Only switch node will have the second dim
  244. reversedEdges := make(map[string][][]string)
  245. rclone := make(map[string][]string)
  246. for fromNode, toNodes := range ruleGraph.Topo.Edges {
  247. if _, ok := ruleGraph.Nodes[fromNode]; !ok {
  248. return nil, fmt.Errorf("node %s is not defined", fromNode)
  249. }
  250. for i, toNode := range toNodes {
  251. switch tn := toNode.(type) {
  252. case string:
  253. if _, ok := ruleGraph.Nodes[tn]; !ok {
  254. return nil, fmt.Errorf("node %s is not defined", tn)
  255. }
  256. if _, ok := reversedEdges[tn]; !ok {
  257. reversedEdges[tn] = make([][]string, 1)
  258. }
  259. reversedEdges[tn][0] = append(reversedEdges[tn][0], fromNode)
  260. rclone[tn] = append(rclone[tn], fromNode)
  261. case []interface{}:
  262. for _, tni := range tn {
  263. tnn, ok := tni.(string)
  264. if !ok { // never happen
  265. return nil, fmt.Errorf("invalid edge toNode %v", toNode)
  266. }
  267. if _, ok := ruleGraph.Nodes[tnn]; !ok {
  268. return nil, fmt.Errorf("node %s is not defined", tnn)
  269. }
  270. for len(reversedEdges[tnn]) <= i {
  271. reversedEdges[tnn] = append(reversedEdges[tnn], []string{})
  272. }
  273. reversedEdges[tnn][i] = append(reversedEdges[tnn][i], fromNode)
  274. rclone[tnn] = append(rclone[tnn], fromNode)
  275. }
  276. }
  277. }
  278. }
  279. // sort the nodes by topological order
  280. nodesInOrder := make([]string, len(ruleGraph.Nodes))
  281. i := 0
  282. genNodesInOrder(ruleGraph.Topo.Sources, ruleGraph.Topo.Edges, rclone, nodesInOrder, i)
  283. // validate the typo
  284. // the map is to record the output for each node
  285. dataFlow := make(map[string]*graph.IOType)
  286. for _, n := range nodesInOrder {
  287. gn := ruleGraph.Nodes[n]
  288. if gn == nil {
  289. return nil, fmt.Errorf("can't find node %s", n)
  290. }
  291. if gn.Type == "source" {
  292. dataFlow[n] = &graph.IOType{
  293. Type: graph.IOINPUT_TYPE_ROW,
  294. RowType: graph.IOROW_TYPE_SINGLE,
  295. CollectionType: graph.IOCOLLECTION_TYPE_ANY,
  296. AllowMulti: false,
  297. }
  298. } else if gn.Type == "sink" {
  299. continue
  300. } else {
  301. nodeIO, ok := graph.OpIO[strings.ToLower(gn.NodeType)]
  302. if !ok {
  303. return nil, fmt.Errorf("can't find the io definition for node type %s", gn.NodeType)
  304. }
  305. dataInCondition := nodeIO[0]
  306. indim := reversedEdges[n]
  307. var innodes []string
  308. for _, in := range indim {
  309. innodes = append(innodes, in...)
  310. }
  311. if len(innodes) > 1 {
  312. if dataInCondition.AllowMulti {
  313. // special case for join which does not allow multiple streams
  314. if gn.NodeType == "join" {
  315. joinStreams := 0
  316. for _, innode := range innodes {
  317. if _, isLookup := lookupTableChildren[innode]; !isLookup {
  318. joinStreams++
  319. }
  320. if joinStreams > 1 {
  321. return nil, fmt.Errorf("join node %s does not allow multiple stream inputs", n)
  322. }
  323. }
  324. }
  325. for _, innode := range innodes {
  326. _, err = graph.Fit(dataFlow[innode], dataInCondition)
  327. if err != nil {
  328. return nil, fmt.Errorf("node %s output does not match node %s input: %v", innode, n, err)
  329. }
  330. }
  331. } else {
  332. return nil, fmt.Errorf("operator %s of type %s does not allow multiple inputs", n, gn.NodeType)
  333. }
  334. } else if len(innodes) == 1 {
  335. _, err := graph.Fit(dataFlow[innodes[0]], dataInCondition)
  336. if err != nil {
  337. return nil, fmt.Errorf("node %s output does not match node %s input: %v", innodes[0], n, err)
  338. }
  339. } else {
  340. return nil, fmt.Errorf("operator %s of type %s has no input", n, gn.NodeType)
  341. }
  342. out := nodeIO[1]
  343. in := dataFlow[innodes[0]]
  344. dataFlow[n] = graph.MapOut(in, out)
  345. // convert filter to having if the input is aggregated
  346. if gn.NodeType == "filter" && in.Type == graph.IOINPUT_TYPE_COLLECTION && in.CollectionType == graph.IOCOLLECTION_TYPE_GROUPED {
  347. fop, err := parseHaving(gn.Props, sourceNames)
  348. if err != nil {
  349. return nil, err
  350. }
  351. op := Transform(fop, n, rule.Options)
  352. nodeMap[n] = op
  353. }
  354. }
  355. }
  356. // add the linkages
  357. for nodeName, fromNodes := range reversedEdges {
  358. totalLen := 0
  359. for _, fromNode := range fromNodes {
  360. totalLen += len(fromNode)
  361. }
  362. inputs := make([]api.Emitter, 0, totalLen)
  363. for i, fromNode := range fromNodes {
  364. for _, from := range fromNode {
  365. if i == 0 {
  366. if src, ok := nodeMap[from].(api.Emitter); ok {
  367. inputs = append(inputs, src)
  368. }
  369. } else {
  370. switch sn := nodeMap[from].(type) {
  371. case *node.SwitchNode:
  372. inputs = append(inputs, sn.GetEmitter(i))
  373. default:
  374. return nil, fmt.Errorf("node %s is not a switch node but have multiple output", from)
  375. }
  376. }
  377. }
  378. }
  379. n := nodeMap[nodeName]
  380. if n == nil {
  381. return nil, fmt.Errorf("node %s is not defined", nodeName)
  382. }
  383. if _, ok := sinks[nodeName]; ok {
  384. tp.AddSink(inputs, n.(*node.SinkNode))
  385. } else {
  386. tp.AddOperator(inputs, n.(node.OperatorNode))
  387. }
  388. }
  389. return tp, nil
  390. }
  391. func genNodesInOrder(toNodes []string, edges map[string][]interface{}, flatReversedEdges map[string][]string, nodesInOrder []string, i int) int {
  392. for _, src := range toNodes {
  393. if len(flatReversedEdges[src]) > 1 {
  394. flatReversedEdges[src] = flatReversedEdges[src][1:]
  395. continue
  396. }
  397. nodesInOrder[i] = src
  398. i++
  399. tns := make([]string, 0, len(edges[src]))
  400. for _, toNode := range edges[src] {
  401. switch toNode.(type) {
  402. case string:
  403. tns = append(tns, toNode.(string))
  404. case []interface{}:
  405. for _, tni := range toNode.([]interface{}) {
  406. tns = append(tns, tni.(string))
  407. }
  408. }
  409. }
  410. i = genNodesInOrder(tns, edges, flatReversedEdges, nodesInOrder, i)
  411. }
  412. return i
  413. }
  414. func parseSource(nodeName string, gn *api.GraphNode, rule *api.Rule, store kv.KeyValue, lookupTableChildren map[string]*ast.Options) (*node.SourceNode, sourceType, string, error) {
  415. sourceMeta := &api.SourceMeta{
  416. SourceType: "stream",
  417. }
  418. err := cast.MapToStruct(gn.Props, sourceMeta)
  419. if err != nil {
  420. return nil, ILLEGAL, "", err
  421. }
  422. if sourceMeta.SourceType != "stream" && sourceMeta.SourceType != "table" {
  423. return nil, ILLEGAL, "", fmt.Errorf("source type %s not supported", sourceMeta.SourceType)
  424. }
  425. // If source name is specified, find the created stream/table from store
  426. if sourceMeta.SourceName != "" {
  427. if store == nil {
  428. store, err = store2.GetKV("stream")
  429. if err != nil {
  430. return nil, ILLEGAL, "", err
  431. }
  432. }
  433. streamStmt, e := xsql.GetDataSource(store, sourceMeta.SourceName)
  434. if e != nil {
  435. return nil, ILLEGAL, "", fmt.Errorf("fail to get stream %s, please check if stream is created", sourceMeta.SourceName)
  436. }
  437. if streamStmt.StreamType == ast.TypeStream && sourceMeta.SourceType == "table" {
  438. return nil, ILLEGAL, "", fmt.Errorf("stream %s is not a table", sourceMeta.SourceName)
  439. } else if streamStmt.StreamType == ast.TypeTable && sourceMeta.SourceType == "stream" {
  440. return nil, ILLEGAL, "", fmt.Errorf("table %s is not a stream", sourceMeta.SourceName)
  441. }
  442. st := streamStmt.Options.TYPE
  443. if st == "" {
  444. st = "mqtt"
  445. }
  446. if st != gn.NodeType {
  447. return nil, ILLEGAL, "", fmt.Errorf("source type %s does not match the stream type %s", gn.NodeType, st)
  448. }
  449. sInfo, err := convertStreamInfo(streamStmt)
  450. if err != nil {
  451. return nil, ILLEGAL, "", err
  452. }
  453. if sInfo.stmt.StreamType == ast.TypeTable && sInfo.stmt.Options.KIND == ast.StreamKindLookup {
  454. lookupTableChildren[string(sInfo.stmt.Name)] = sInfo.stmt.Options
  455. return nil, LOOKUPTABLE, string(sInfo.stmt.Name), nil
  456. } else {
  457. // Use the plan to calculate the schema and other meta info
  458. p := DataSourcePlan{
  459. name: sInfo.stmt.Name,
  460. streamStmt: sInfo.stmt,
  461. streamFields: sInfo.schema.ToJsonSchema(),
  462. isSchemaless: sInfo.schema == nil,
  463. iet: rule.Options.IsEventTime,
  464. allMeta: rule.Options.SendMetaToSink,
  465. }.Init()
  466. if sInfo.stmt.StreamType == ast.TypeStream {
  467. err = p.PruneColumns(nil)
  468. if err != nil {
  469. return nil, ILLEGAL, "", err
  470. }
  471. srcNode, e := transformSourceNode(p, nil, rule.Options)
  472. if e != nil {
  473. return nil, ILLEGAL, "", e
  474. }
  475. return srcNode, STREAM, string(sInfo.stmt.Name), nil
  476. } else {
  477. return nil, SCANTABLE, string(sInfo.stmt.Name), nil
  478. }
  479. }
  480. } else {
  481. sourceOption := &ast.Options{}
  482. err = cast.MapToStruct(gn.Props, sourceOption)
  483. if err != nil {
  484. return nil, ILLEGAL, "", err
  485. }
  486. sourceOption.TYPE = gn.NodeType
  487. if sourceOption.SCHEMAID == "" && gn.Props["schemaName"] != nil && gn.Props["schemaMessage"] != nil {
  488. schemaName, ok1 := gn.Props["schemaName"].(string)
  489. schemaMessage, ok2 := gn.Props["schemaMessage"].(string)
  490. if ok1 && ok2 {
  491. sourceOption.SCHEMAID = schemaName + "." + schemaMessage
  492. }
  493. }
  494. switch sourceMeta.SourceType {
  495. case "stream":
  496. pp, err := operator.NewPreprocessor(true, nil, true, nil, rule.Options.IsEventTime, sourceOption.TIMESTAMP, sourceOption.TIMESTAMP_FORMAT, strings.EqualFold(sourceOption.FORMAT, message.FormatBinary), sourceOption.STRICT_VALIDATION)
  497. if err != nil {
  498. return nil, ILLEGAL, "", err
  499. }
  500. srcNode := node.NewSourceNode(nodeName, ast.TypeStream, pp, sourceOption, rule.Options.SendError, nil)
  501. return srcNode, STREAM, nodeName, nil
  502. case "table":
  503. return nil, ILLEGAL, "", fmt.Errorf("anonymouse table source is not supported, please create it prior to the rule")
  504. }
  505. }
  506. return nil, ILLEGAL, "", errors.New("invalid source node")
  507. }
  508. func parseOrderBy(props map[string]interface{}, sourceNames []string) (*operator.OrderOp, error) {
  509. n := &graph.Orderby{}
  510. err := cast.MapToStruct(props, n)
  511. if err != nil {
  512. return nil, err
  513. }
  514. stmt := "SELECT * FROM unknown ORDER BY"
  515. for _, s := range n.Sorts {
  516. stmt += " " + s.Field + " "
  517. if s.Desc {
  518. stmt += "DESC"
  519. }
  520. }
  521. p, err := xsql.NewParserWithSources(strings.NewReader(stmt), sourceNames).Parse()
  522. if err != nil {
  523. return nil, fmt.Errorf("invalid order by statement error: %v", err)
  524. }
  525. if len(p.SortFields) == 0 {
  526. return nil, fmt.Errorf("order by statement is empty")
  527. }
  528. return &operator.OrderOp{
  529. SortFields: p.SortFields,
  530. }, nil
  531. }
  532. func parseGroupBy(props map[string]interface{}, sourceNames []string) (*operator.AggregateOp, error) {
  533. n := &graph.Groupby{}
  534. err := cast.MapToStruct(props, n)
  535. if err != nil {
  536. return nil, err
  537. }
  538. if len(n.Dimensions) == 0 {
  539. return nil, fmt.Errorf("groupby must have at least one dimension")
  540. }
  541. stmt := "SELECT * FROM unknown Group By " + strings.Join(n.Dimensions, ",")
  542. p, err := xsql.NewParserWithSources(strings.NewReader(stmt), sourceNames).Parse()
  543. if err != nil {
  544. return nil, fmt.Errorf("invalid join statement error: %v", err)
  545. }
  546. return &operator.AggregateOp{Dimensions: p.Dimensions}, nil
  547. }
  548. func parseJoinAst(props map[string]interface{}, sourceNames []string) (*ast.SelectStatement, error) {
  549. n := &graph.Join{}
  550. err := cast.MapToStruct(props, n)
  551. if err != nil {
  552. return nil, err
  553. }
  554. stmt := "SELECT * FROM " + n.From
  555. for _, join := range n.Joins {
  556. stmt += " " + join.Type + " JOIN " + join.Name + " ON " + join.On
  557. }
  558. return xsql.NewParserWithSources(strings.NewReader(stmt), sourceNames).Parse()
  559. }
  560. func parseWatermark(props map[string]interface{}, streamEmitters map[string]struct{}) (*graph.Watermark, error) {
  561. n := &graph.Watermark{}
  562. err := cast.MapToStruct(props, n)
  563. if err != nil {
  564. return nil, err
  565. }
  566. if len(n.Emitters) == 0 {
  567. return nil, fmt.Errorf("watermark must have at least one emitter")
  568. }
  569. for _, e := range n.Emitters {
  570. if _, ok := streamEmitters[e]; !ok {
  571. return nil, fmt.Errorf("emitter %s does not exist", e)
  572. }
  573. }
  574. return n, nil
  575. }
  576. func parseWindow(props map[string]interface{}) (*node.WindowConfig, error) {
  577. n := &graph.Window{}
  578. err := cast.MapToStruct(props, n)
  579. if err != nil {
  580. return nil, err
  581. }
  582. if n.Size <= 0 {
  583. return nil, fmt.Errorf("window size %d is invalid", n.Size)
  584. }
  585. var (
  586. wt ast.WindowType
  587. length int
  588. interval int
  589. rawInterval int
  590. )
  591. switch strings.ToLower(n.Type) {
  592. case "tumblingwindow":
  593. wt = ast.TUMBLING_WINDOW
  594. if n.Interval != 0 && n.Interval != n.Size {
  595. return nil, fmt.Errorf("tumbling window interval must equal to size")
  596. }
  597. rawInterval = n.Size
  598. case "hoppingwindow":
  599. wt = ast.HOPPING_WINDOW
  600. if n.Interval <= 0 {
  601. return nil, fmt.Errorf("hopping window interval must be greater than 0")
  602. }
  603. if n.Interval > n.Size {
  604. return nil, fmt.Errorf("hopping window interval must be less than size")
  605. }
  606. rawInterval = n.Interval
  607. case "sessionwindow":
  608. wt = ast.SESSION_WINDOW
  609. if n.Interval <= 0 {
  610. return nil, fmt.Errorf("hopping window interval must be greater than 0")
  611. }
  612. rawInterval = n.Size
  613. case "slidingwindow":
  614. wt = ast.SLIDING_WINDOW
  615. if n.Interval != 0 && n.Interval != n.Size {
  616. return nil, fmt.Errorf("tumbling window interval must equal to size")
  617. }
  618. case "countwindow":
  619. wt = ast.COUNT_WINDOW
  620. if n.Interval < 0 {
  621. return nil, fmt.Errorf("count window interval must be greater or equal to 0")
  622. }
  623. if n.Interval > n.Size {
  624. return nil, fmt.Errorf("count window interval must be less than size")
  625. }
  626. if n.Interval == 0 {
  627. n.Interval = n.Size
  628. }
  629. default:
  630. return nil, fmt.Errorf("unknown window type %s", n.Type)
  631. }
  632. var timeUnit ast.Token
  633. if wt == ast.COUNT_WINDOW {
  634. length = n.Size
  635. interval = n.Interval
  636. } else {
  637. unit := 1
  638. switch strings.ToLower(n.Unit) {
  639. case "dd":
  640. unit = 24 * 3600 * 1000
  641. timeUnit = ast.DD
  642. case "hh":
  643. unit = 3600 * 1000
  644. timeUnit = ast.HH
  645. case "mi":
  646. unit = 60 * 1000
  647. timeUnit = ast.MI
  648. case "ss":
  649. unit = 1000
  650. timeUnit = ast.SS
  651. case "ms":
  652. unit = 1
  653. timeUnit = ast.MS
  654. default:
  655. return nil, fmt.Errorf("Invalid unit %s", n.Unit)
  656. }
  657. length = n.Size * unit
  658. interval = n.Interval * unit
  659. }
  660. return &node.WindowConfig{
  661. RawInterval: rawInterval,
  662. Type: wt,
  663. Length: int64(length),
  664. Interval: int64(interval),
  665. TimeUnit: timeUnit,
  666. }, nil
  667. }
  668. func parsePick(props map[string]interface{}, sourceNames []string) (*operator.ProjectOp, error) {
  669. n := &graph.Select{}
  670. err := cast.MapToStruct(props, n)
  671. if err != nil {
  672. return nil, err
  673. }
  674. stmt, err := xsql.NewParserWithSources(strings.NewReader("select "+strings.Join(n.Fields, ",")+" from nonexist"), sourceNames).Parse()
  675. if err != nil {
  676. return nil, err
  677. }
  678. t := ProjectPlan{
  679. fields: stmt.Fields,
  680. isAggregate: xsql.WithAggFields(stmt),
  681. }.Init()
  682. return &operator.ProjectOp{ColNames: t.colNames, AliasNames: t.aliasNames, AliasFields: t.aliasFields, ExprFields: t.exprFields, ExceptNames: t.exceptNames, IsAggregate: t.isAggregate, AllWildcard: t.allWildcard, WildcardEmitters: t.wildcardEmitters, ExprNames: t.exprNames, SendMeta: t.sendMeta}, nil
  683. }
  684. func parseFunc(props map[string]interface{}, sourceNames []string) (*operator.FuncOp, error) {
  685. m, ok := props["expr"]
  686. if !ok {
  687. return nil, errors.New("no expr")
  688. }
  689. funcExpr, ok := m.(string)
  690. if !ok {
  691. return nil, fmt.Errorf("expr %v is not string", m)
  692. }
  693. stmt, err := xsql.NewParserWithSources(strings.NewReader("select "+funcExpr+" from nonexist"), sourceNames).Parse()
  694. if err != nil {
  695. return nil, err
  696. }
  697. f := stmt.Fields[0]
  698. c, ok := f.Expr.(*ast.Call)
  699. if !ok {
  700. // never happen
  701. return nil, fmt.Errorf("expr %s is not ast.Call", funcExpr)
  702. }
  703. var name string
  704. if f.AName != "" {
  705. name = f.AName
  706. } else {
  707. name = f.Name
  708. }
  709. return &operator.FuncOp{CallExpr: c, Name: name, IsAgg: function.IsAggFunc(name)}, nil
  710. }
  711. func parseFilter(props map[string]interface{}, sourceNames []string) (*operator.FilterOp, error) {
  712. m, ok := props["expr"]
  713. if !ok {
  714. return nil, errors.New("no expr")
  715. }
  716. conditionExpr, ok := m.(string)
  717. if !ok {
  718. return nil, fmt.Errorf("expr %v is not string", m)
  719. }
  720. p := xsql.NewParserWithSources(strings.NewReader("where "+conditionExpr), sourceNames)
  721. if exp, err := p.ParseCondition(); err != nil {
  722. return nil, err
  723. } else {
  724. if exp != nil {
  725. return &operator.FilterOp{Condition: exp}, nil
  726. }
  727. }
  728. return nil, fmt.Errorf("expr %v is not a condition", m)
  729. }
  730. func parseHaving(props map[string]interface{}, sourceNames []string) (*operator.HavingOp, error) {
  731. m, ok := props["expr"]
  732. if !ok {
  733. return nil, errors.New("no expr")
  734. }
  735. conditionExpr, ok := m.(string)
  736. if !ok {
  737. return nil, fmt.Errorf("expr %v is not string", m)
  738. }
  739. p := xsql.NewParserWithSources(strings.NewReader("where "+conditionExpr), sourceNames)
  740. if exp, err := p.ParseCondition(); err != nil {
  741. return nil, err
  742. } else {
  743. if exp != nil {
  744. return &operator.HavingOp{Condition: exp}, nil
  745. }
  746. }
  747. return nil, fmt.Errorf("expr %v is not a condition", m)
  748. }
  749. func parseSwitch(props map[string]interface{}, sourceNames []string) (*node.SwitchConfig, error) {
  750. n := &graph.Switch{}
  751. err := cast.MapToStruct(props, n)
  752. if err != nil {
  753. return nil, err
  754. }
  755. if len(n.Cases) == 0 {
  756. return nil, fmt.Errorf("switch node must have at least one case")
  757. }
  758. caseExprs := make([]ast.Expr, len(n.Cases))
  759. for i, c := range n.Cases {
  760. p := xsql.NewParserWithSources(strings.NewReader("where "+c), sourceNames)
  761. if exp, err := p.ParseCondition(); err != nil {
  762. return nil, fmt.Errorf("parse case %d error: %v", i, err)
  763. } else {
  764. if exp != nil {
  765. caseExprs[i] = exp
  766. }
  767. }
  768. }
  769. return &node.SwitchConfig{
  770. Cases: caseExprs,
  771. StopAtFirstMatch: n.StopAtFirstMatch,
  772. }, nil
  773. }