rule_migration.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/internal/binder/function"
  20. "github.com/lf-edge/ekuiper/internal/binder/io"
  21. "github.com/lf-edge/ekuiper/internal/meta"
  22. store2 "github.com/lf-edge/ekuiper/internal/pkg/store"
  23. "github.com/lf-edge/ekuiper/internal/plugin"
  24. "github.com/lf-edge/ekuiper/internal/processor"
  25. "github.com/lf-edge/ekuiper/internal/topo/graph"
  26. "github.com/lf-edge/ekuiper/internal/topo/node/conf"
  27. "github.com/lf-edge/ekuiper/internal/xsql"
  28. "github.com/lf-edge/ekuiper/pkg/api"
  29. "github.com/lf-edge/ekuiper/pkg/ast"
  30. "github.com/lf-edge/ekuiper/pkg/cast"
  31. "strings"
  32. )
  33. type RuleMigrationProcessor struct {
  34. r *processor.RuleProcessor
  35. s *processor.StreamProcessor
  36. }
  37. func NewRuleMigrationProcessor(r *processor.RuleProcessor, s *processor.StreamProcessor) *RuleMigrationProcessor {
  38. return &RuleMigrationProcessor{
  39. r: r,
  40. s: s,
  41. }
  42. }
  43. func newDependencies() *dependencies {
  44. return &dependencies{
  45. sourceConfigKeys: map[string][]string{},
  46. sinkConfigKeys: map[string][]string{},
  47. }
  48. }
  49. // dependencies copy all connections related configs by hardcode
  50. type dependencies struct {
  51. rules []string
  52. streams []string
  53. tables []string
  54. sources []string
  55. sinks []string
  56. sourceConfigKeys map[string][]string
  57. sinkConfigKeys map[string][]string
  58. functions []string
  59. schemas []string
  60. }
  61. func ruleTraverse(rule *api.Rule, de *dependencies) {
  62. sql := rule.Sql
  63. ruleGraph := rule.Graph
  64. if sql != "" {
  65. stmt, err := xsql.GetStatementFromSql(sql)
  66. if err != nil {
  67. return
  68. }
  69. store, err := store2.GetKV("stream")
  70. if err != nil {
  71. return
  72. }
  73. //streams
  74. streamsFromStmt := xsql.GetStreams(stmt)
  75. for _, s := range streamsFromStmt {
  76. streamStmt, err := xsql.GetDataSource(store, s)
  77. if err != nil {
  78. continue
  79. }
  80. if streamStmt.StreamType == ast.TypeStream {
  81. //get streams
  82. de.streams = append(de.streams, string(streamStmt.Name))
  83. } else if streamStmt.StreamType == ast.TypeTable {
  84. //get tables
  85. de.tables = append(de.tables, string(streamStmt.Name))
  86. }
  87. //get source type
  88. de.sources = append(de.sources, streamStmt.Options.TYPE)
  89. //get config key
  90. _, ok := de.sourceConfigKeys[streamStmt.Options.TYPE]
  91. if ok {
  92. de.sourceConfigKeys[streamStmt.Options.TYPE] = append(de.sourceConfigKeys[streamStmt.Options.TYPE], streamStmt.Options.CONF_KEY)
  93. } else {
  94. var confKeys []string
  95. confKeys = append(confKeys, streamStmt.Options.CONF_KEY)
  96. de.sourceConfigKeys[streamStmt.Options.TYPE] = confKeys
  97. }
  98. //get schema id
  99. if streamStmt.Options.SCHEMAID != "" {
  100. r := strings.Split(streamStmt.Options.SCHEMAID, ".")
  101. de.schemas = append(de.schemas, streamStmt.Options.FORMAT+"_"+r[0])
  102. }
  103. }
  104. //actions
  105. for _, m := range rule.Actions {
  106. for name, action := range m {
  107. props, _ := action.(map[string]interface{})
  108. de.sinks = append(de.sinks, name)
  109. resourceId, ok := props[conf.ResourceID].(string)
  110. if ok {
  111. _, ok := de.sinkConfigKeys[name]
  112. if ok {
  113. de.sinkConfigKeys[name] = append(de.sinkConfigKeys[name], resourceId)
  114. } else {
  115. var confKeys []string
  116. confKeys = append(confKeys, resourceId)
  117. de.sinkConfigKeys[name] = confKeys
  118. }
  119. }
  120. format, ok := props["format"].(string)
  121. if ok && format != "json" {
  122. schemaId, ok := props["schemaId"].(string)
  123. if ok {
  124. r := strings.Split(schemaId, ".")
  125. de.schemas = append(de.schemas, format+"_"+r[0])
  126. }
  127. }
  128. }
  129. }
  130. // function
  131. ast.WalkFunc(stmt, func(n ast.Node) bool {
  132. switch f := n.(type) {
  133. case *ast.Call:
  134. de.functions = append(de.functions, f.Name)
  135. }
  136. return true
  137. })
  138. //Rules
  139. de.rules = append(de.rules, rule.Id)
  140. } else {
  141. for _, gn := range ruleGraph.Nodes {
  142. switch gn.Type {
  143. case "source":
  144. sourceOption := &ast.Options{}
  145. err := cast.MapToStruct(gn.Props, sourceOption)
  146. if err != nil {
  147. break
  148. }
  149. sourceOption.TYPE = gn.NodeType
  150. de.sources = append(de.sources, sourceOption.TYPE)
  151. //get config key
  152. _, ok := de.sourceConfigKeys[sourceOption.TYPE]
  153. if ok {
  154. de.sourceConfigKeys[sourceOption.TYPE] = append(de.sourceConfigKeys[sourceOption.TYPE], sourceOption.CONF_KEY)
  155. } else {
  156. var confKeys []string
  157. confKeys = append(confKeys, sourceOption.CONF_KEY)
  158. de.sourceConfigKeys[sourceOption.TYPE] = confKeys
  159. }
  160. //get schema id
  161. if sourceOption.SCHEMAID != "" {
  162. r := strings.Split(sourceOption.SCHEMAID, ".")
  163. de.schemas = append(de.schemas, sourceOption.FORMAT+"_"+r[0])
  164. }
  165. case "sink":
  166. sinkType := gn.NodeType
  167. props := gn.Props
  168. de.sinks = append(de.sinks, sinkType)
  169. resourceId, ok := props[conf.ResourceID].(string)
  170. if ok {
  171. _, ok := de.sinkConfigKeys[sinkType]
  172. if ok {
  173. de.sinkConfigKeys[sinkType] = append(de.sinkConfigKeys[sinkType], resourceId)
  174. } else {
  175. var confKeys []string
  176. confKeys = append(confKeys, resourceId)
  177. de.sinkConfigKeys[sinkType] = confKeys
  178. }
  179. }
  180. format, ok := props["format"].(string)
  181. if ok && format != "json" {
  182. schemaId, ok := props["schemaId"].(string)
  183. if ok {
  184. r := strings.Split(schemaId, ".")
  185. de.schemas = append(de.schemas, format+"_"+r[0])
  186. }
  187. }
  188. case "operator":
  189. nt := strings.ToLower(gn.NodeType)
  190. switch nt {
  191. case "function":
  192. fop, err := parseFunc(gn.Props)
  193. if err != nil {
  194. break
  195. }
  196. ast.WalkFunc(fop, func(n ast.Node) bool {
  197. switch f := n.(type) {
  198. case *ast.Call:
  199. de.functions = append(de.functions, f.Name)
  200. }
  201. return true
  202. })
  203. case "aggfunc":
  204. fop, err := parseFunc(gn.Props)
  205. if err != nil {
  206. break
  207. }
  208. ast.WalkFunc(fop, func(n ast.Node) bool {
  209. switch f := n.(type) {
  210. case *ast.Call:
  211. de.functions = append(de.functions, f.Name)
  212. }
  213. return true
  214. })
  215. case "filter":
  216. fop, err := parseFilter(gn.Props)
  217. if err != nil {
  218. break
  219. }
  220. ast.WalkFunc(fop, func(n ast.Node) bool {
  221. switch f := n.(type) {
  222. case *ast.Call:
  223. de.functions = append(de.functions, f.Name)
  224. }
  225. return true
  226. })
  227. case "pick":
  228. pop, err := parsePick(gn.Props)
  229. if err != nil {
  230. break
  231. }
  232. ast.WalkFunc(pop, func(n ast.Node) bool {
  233. switch f := n.(type) {
  234. case *ast.Call:
  235. de.functions = append(de.functions, f.Name)
  236. }
  237. return true
  238. })
  239. case "join":
  240. jop, err := parseJoin(gn.Props)
  241. if err != nil {
  242. break
  243. }
  244. ast.WalkFunc(jop, func(n ast.Node) bool {
  245. switch f := n.(type) {
  246. case *ast.Call:
  247. de.functions = append(de.functions, f.Name)
  248. }
  249. return true
  250. })
  251. case "groupby":
  252. gop, err := parseGroupBy(gn.Props)
  253. if err != nil {
  254. break
  255. }
  256. ast.WalkFunc(gop, func(n ast.Node) bool {
  257. switch f := n.(type) {
  258. case *ast.Call:
  259. de.functions = append(de.functions, f.Name)
  260. }
  261. return true
  262. })
  263. case "orderby":
  264. oop, err := parseOrderBy(gn.Props)
  265. if err != nil {
  266. break
  267. }
  268. ast.WalkFunc(oop, func(n ast.Node) bool {
  269. switch f := n.(type) {
  270. case *ast.Call:
  271. de.functions = append(de.functions, f.Name)
  272. }
  273. return true
  274. })
  275. case "switch":
  276. opArray, err := parseSwitch(gn.Props)
  277. if err != nil {
  278. break
  279. }
  280. for _, op := range opArray {
  281. ast.WalkFunc(op, func(n ast.Node) bool {
  282. switch f := n.(type) {
  283. case *ast.Call:
  284. de.functions = append(de.functions, f.Name)
  285. }
  286. return true
  287. })
  288. }
  289. }
  290. default:
  291. break
  292. }
  293. }
  294. }
  295. }
  296. func (p *RuleMigrationProcessor) ConfigurationPartialExport(rules []string) ([]byte, error) {
  297. config := &Configuration{
  298. Streams: make(map[string]string),
  299. Tables: make(map[string]string),
  300. Rules: make(map[string]string),
  301. NativePlugins: make(map[string]string),
  302. PortablePlugins: make(map[string]string),
  303. SourceConfig: make(map[string]string),
  304. SinkConfig: make(map[string]string),
  305. ConnectionConfig: make(map[string]string),
  306. Service: make(map[string]string),
  307. Schema: make(map[string]string),
  308. }
  309. config.Rules = p.exportRules(rules)
  310. de := newDependencies()
  311. for _, v := range rules {
  312. rule, _ := p.r.GetRuleById(v)
  313. if rule != nil {
  314. ruleTraverse(rule, de)
  315. }
  316. }
  317. p.exportSelected(de, config)
  318. return json.Marshal(config)
  319. }
  320. func (p *RuleMigrationProcessor) exportRules(rules []string) map[string]string {
  321. ruleSet := make(map[string]string)
  322. for _, v := range rules {
  323. ruleJson, _ := p.r.GetRuleJson(v)
  324. ruleSet[v] = ruleJson
  325. }
  326. return ruleSet
  327. }
  328. func (p *RuleMigrationProcessor) exportStreams(streams []string) map[string]string {
  329. streamSet := make(map[string]string)
  330. for _, v := range streams {
  331. streamJson, _ := p.s.GetStream(v, ast.TypeStream)
  332. streamSet[v] = streamJson
  333. }
  334. return streamSet
  335. }
  336. func (p *RuleMigrationProcessor) exportTables(tables []string) map[string]string {
  337. tableSet := make(map[string]string)
  338. for _, v := range tables {
  339. tableJson, _ := p.s.GetStream(v, ast.TypeTable)
  340. tableSet[v] = tableJson
  341. }
  342. return tableSet
  343. }
  344. func (p *RuleMigrationProcessor) exportSelected(de *dependencies, config *Configuration) {
  345. //get the stream and table
  346. config.Streams = p.exportStreams(de.streams)
  347. config.Tables = p.exportTables(de.tables)
  348. //get the sources
  349. for _, v := range de.sources {
  350. t, srcName, srcInfo := io.GetSourcePlugin(v)
  351. if t == plugin.NATIVE_EXTENSION {
  352. config.NativePlugins[srcName] = srcInfo
  353. }
  354. if t == plugin.PORTABLE_EXTENSION {
  355. config.PortablePlugins[srcName] = srcInfo
  356. }
  357. }
  358. // get sinks
  359. for _, v := range de.sinks {
  360. t, sinkName, sinkInfo := io.GetSinkPlugin(v)
  361. if t == plugin.NATIVE_EXTENSION {
  362. config.NativePlugins[sinkName] = sinkInfo
  363. }
  364. if t == plugin.PORTABLE_EXTENSION {
  365. config.PortablePlugins[sinkName] = sinkInfo
  366. }
  367. }
  368. // get functions
  369. for _, v := range de.functions {
  370. t, svcName, svcInfo := function.GetFunctionPlugin(v)
  371. if t == plugin.NATIVE_EXTENSION {
  372. config.NativePlugins[svcName] = svcInfo
  373. }
  374. if t == plugin.PORTABLE_EXTENSION {
  375. config.PortablePlugins[svcName] = svcInfo
  376. }
  377. if t == plugin.SERVICE_EXTENSION {
  378. config.Service[svcName] = svcInfo
  379. }
  380. }
  381. // get sourceCfg/sinkCfg
  382. configKeys := meta.YamlConfigurationKeys{}
  383. configKeys.Sources = de.sourceConfigKeys
  384. configKeys.Sinks = de.sinkConfigKeys
  385. configSet := meta.GetConfigurationsFor(configKeys)
  386. config.SourceConfig = configSet.Sources
  387. config.SinkConfig = configSet.Sinks
  388. config.ConnectionConfig = configSet.Connections
  389. //get schema
  390. for _, v := range de.schemas {
  391. schName, schInfo := getSchemaInstallScript(v)
  392. config.Schema[schName] = schInfo
  393. }
  394. }
  395. func parsePick(props map[string]interface{}) (*ast.SelectStatement, error) {
  396. n := &graph.Select{}
  397. err := cast.MapToStruct(props, n)
  398. if err != nil {
  399. return nil, err
  400. }
  401. stmt, err := xsql.NewParser(strings.NewReader("select " + strings.Join(n.Fields, ",") + " from nonexist")).Parse()
  402. if err != nil {
  403. return nil, err
  404. } else {
  405. return stmt, nil
  406. }
  407. }
  408. func parseFunc(props map[string]interface{}) (*ast.SelectStatement, error) {
  409. m, ok := props["expr"]
  410. if !ok {
  411. return nil, errors.New("no expr")
  412. }
  413. funcExpr, ok := m.(string)
  414. if !ok {
  415. return nil, fmt.Errorf("expr %v is not string", m)
  416. }
  417. stmt, err := xsql.NewParser(strings.NewReader("select " + funcExpr + " from nonexist")).Parse()
  418. if err != nil {
  419. return nil, err
  420. } else {
  421. return stmt, nil
  422. }
  423. }
  424. func parseFilter(props map[string]interface{}) (ast.Expr, error) {
  425. m, ok := props["expr"]
  426. if !ok {
  427. return nil, errors.New("no expr")
  428. }
  429. conditionExpr, ok := m.(string)
  430. if !ok {
  431. return nil, fmt.Errorf("expr %v is not string", m)
  432. }
  433. p := xsql.NewParser(strings.NewReader(" where " + conditionExpr))
  434. if exp, err := p.ParseCondition(); err != nil {
  435. return nil, err
  436. } else {
  437. return exp, nil
  438. }
  439. }
  440. func parseHaving(props map[string]interface{}) (ast.Expr, error) {
  441. m, ok := props["expr"]
  442. if !ok {
  443. return nil, errors.New("no expr")
  444. }
  445. conditionExpr, ok := m.(string)
  446. if !ok {
  447. return nil, fmt.Errorf("expr %v is not string", m)
  448. }
  449. p := xsql.NewParser(strings.NewReader("where " + conditionExpr))
  450. if exp, err := p.ParseCondition(); err != nil {
  451. return nil, err
  452. } else {
  453. return exp, nil
  454. }
  455. }
  456. func parseSwitch(props map[string]interface{}) ([]ast.Expr, error) {
  457. n := &graph.Switch{}
  458. err := cast.MapToStruct(props, n)
  459. if err != nil {
  460. return nil, err
  461. }
  462. if len(n.Cases) == 0 {
  463. return nil, fmt.Errorf("switch node must have at least one case")
  464. }
  465. caseExprs := make([]ast.Expr, len(n.Cases))
  466. for i, c := range n.Cases {
  467. p := xsql.NewParser(strings.NewReader("where " + c))
  468. if exp, err := p.ParseCondition(); err != nil {
  469. return nil, fmt.Errorf("parse case %d error: %v", i, err)
  470. } else {
  471. if exp != nil {
  472. caseExprs[i] = exp
  473. }
  474. }
  475. }
  476. return caseExprs, nil
  477. }
  478. func parseOrderBy(props map[string]interface{}) (*ast.SelectStatement, error) {
  479. n := &graph.Orderby{}
  480. err := cast.MapToStruct(props, n)
  481. if err != nil {
  482. return nil, err
  483. }
  484. stmt := "SELECT * FROM unknown ORDER BY"
  485. for _, s := range n.Sorts {
  486. stmt += " " + s.Field + " "
  487. if s.Desc {
  488. stmt += "DESC"
  489. }
  490. }
  491. p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
  492. if err != nil {
  493. return nil, fmt.Errorf("invalid order by statement error: %v", err)
  494. } else {
  495. return p, nil
  496. }
  497. }
  498. func parseGroupBy(props map[string]interface{}) (*ast.SelectStatement, error) {
  499. n := &graph.Groupby{}
  500. err := cast.MapToStruct(props, n)
  501. if err != nil {
  502. return nil, err
  503. }
  504. if len(n.Dimensions) == 0 {
  505. return nil, fmt.Errorf("groupby must have at least one dimension")
  506. }
  507. stmt := "SELECT * FROM unknown Group By " + strings.Join(n.Dimensions, ",")
  508. p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
  509. if err != nil {
  510. return nil, fmt.Errorf("invalid join statement error: %v", err)
  511. } else {
  512. return p, nil
  513. }
  514. }
  515. func parseJoin(props map[string]interface{}) (*ast.SelectStatement, error) {
  516. n := &graph.Join{}
  517. err := cast.MapToStruct(props, n)
  518. if err != nil {
  519. return nil, err
  520. }
  521. stmt := "SELECT * FROM " + n.From
  522. for _, join := range n.Joins {
  523. stmt += " " + join.Type + " JOIN ON " + join.On
  524. }
  525. p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
  526. if err != nil {
  527. return nil, fmt.Errorf("invalid join statement error: %v", err)
  528. } else {
  529. return p, nil
  530. }
  531. }