rule_migration.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/internal/binder/function"
  20. "github.com/lf-edge/ekuiper/internal/binder/io"
  21. "github.com/lf-edge/ekuiper/internal/meta"
  22. store2 "github.com/lf-edge/ekuiper/internal/pkg/store"
  23. "github.com/lf-edge/ekuiper/internal/plugin"
  24. "github.com/lf-edge/ekuiper/internal/processor"
  25. "github.com/lf-edge/ekuiper/internal/topo/graph"
  26. "github.com/lf-edge/ekuiper/internal/topo/node/conf"
  27. "github.com/lf-edge/ekuiper/internal/xsql"
  28. "github.com/lf-edge/ekuiper/pkg/api"
  29. "github.com/lf-edge/ekuiper/pkg/ast"
  30. "github.com/lf-edge/ekuiper/pkg/cast"
  31. "strings"
  32. )
  33. type RuleMigrationProcessor struct {
  34. r *processor.RuleProcessor
  35. s *processor.StreamProcessor
  36. }
  37. func NewRuleMigrationProcessor(r *processor.RuleProcessor, s *processor.StreamProcessor) *RuleMigrationProcessor {
  38. return &RuleMigrationProcessor{
  39. r: r,
  40. s: s,
  41. }
  42. }
  43. func newDependencies() *dependencies {
  44. return &dependencies{
  45. sourceConfigKeys: map[string][]string{},
  46. sinkConfigKeys: map[string][]string{},
  47. }
  48. }
  49. // dependencies copy all connections related configs by hardcode
  50. type dependencies struct {
  51. rules []string
  52. streams []string
  53. tables []string
  54. sources []string
  55. sinks []string
  56. sourceConfigKeys map[string][]string
  57. sinkConfigKeys map[string][]string
  58. functions []string
  59. schemas []string
  60. }
  61. func ruleTraverse(rule *api.Rule, de *dependencies) {
  62. sql := rule.Sql
  63. if sql != "" {
  64. stmt, err := xsql.GetStatementFromSql(sql)
  65. if err != nil {
  66. return
  67. }
  68. err, store := store2.GetKV("stream")
  69. if err != nil {
  70. return
  71. }
  72. //streams
  73. streamsFromStmt := xsql.GetStreams(stmt)
  74. for _, s := range streamsFromStmt {
  75. streamStmt, err := xsql.GetDataSource(store, s)
  76. if err != nil {
  77. continue
  78. }
  79. if streamStmt.StreamType == ast.TypeStream {
  80. //get streams
  81. de.streams = append(de.streams, string(streamStmt.Name))
  82. } else if streamStmt.StreamType == ast.TypeTable {
  83. //get tables
  84. de.tables = append(de.tables, string(streamStmt.Name))
  85. }
  86. //get source type
  87. de.sources = append(de.sources, streamStmt.Options.TYPE)
  88. //get config key
  89. _, ok := de.sourceConfigKeys[streamStmt.Options.TYPE]
  90. if ok {
  91. de.sourceConfigKeys[streamStmt.Options.TYPE] = append(de.sourceConfigKeys[streamStmt.Options.TYPE], streamStmt.Options.CONF_KEY)
  92. } else {
  93. var confKeys []string
  94. confKeys = append(confKeys, streamStmt.Options.CONF_KEY)
  95. de.sourceConfigKeys[streamStmt.Options.TYPE] = confKeys
  96. }
  97. //get schema id
  98. if streamStmt.Options.SCHEMAID != "" {
  99. r := strings.Split(streamStmt.Options.SCHEMAID, ".")
  100. de.schemas = append(de.schemas, streamStmt.Options.FORMAT+"_"+r[0])
  101. }
  102. }
  103. //actions
  104. for _, m := range rule.Actions {
  105. for name, action := range m {
  106. props, _ := action.(map[string]interface{})
  107. de.sinks = append(de.sinks, name)
  108. resourceId, ok := props[conf.ResourceID].(string)
  109. if ok {
  110. _, ok := de.sinkConfigKeys[name]
  111. if ok {
  112. de.sinkConfigKeys[name] = append(de.sinkConfigKeys[name], resourceId)
  113. } else {
  114. var confKeys []string
  115. confKeys = append(confKeys, resourceId)
  116. de.sinkConfigKeys[name] = confKeys
  117. }
  118. }
  119. format, ok := props["format"].(string)
  120. if ok && format != "json" {
  121. schemaId, ok := props["schemaId"].(string)
  122. if ok {
  123. r := strings.Split(schemaId, ".")
  124. de.schemas = append(de.schemas, format+"_"+r[0])
  125. }
  126. }
  127. }
  128. }
  129. // function
  130. ast.WalkFunc(stmt, func(n ast.Node) bool {
  131. switch f := n.(type) {
  132. case *ast.Call:
  133. de.functions = append(de.functions, f.Name)
  134. }
  135. return true
  136. })
  137. //Rules
  138. de.rules = append(de.rules, rule.Id)
  139. }
  140. ruleGraph := rule.Graph
  141. if ruleGraph != nil {
  142. for _, gn := range ruleGraph.Nodes {
  143. switch gn.Type {
  144. case "source":
  145. sourceOption := &ast.Options{}
  146. err := cast.MapToStruct(gn.Props, sourceOption)
  147. if err != nil {
  148. break
  149. }
  150. sourceOption.TYPE = gn.NodeType
  151. de.sources = append(de.sources, sourceOption.TYPE)
  152. //get config key
  153. _, ok := de.sourceConfigKeys[sourceOption.TYPE]
  154. if ok {
  155. de.sourceConfigKeys[sourceOption.TYPE] = append(de.sourceConfigKeys[sourceOption.TYPE], sourceOption.CONF_KEY)
  156. } else {
  157. var confKeys []string
  158. confKeys = append(confKeys, sourceOption.CONF_KEY)
  159. de.sourceConfigKeys[sourceOption.TYPE] = confKeys
  160. }
  161. //get schema id
  162. if sourceOption.SCHEMAID != "" {
  163. r := strings.Split(sourceOption.SCHEMAID, ".")
  164. de.schemas = append(de.schemas, sourceOption.FORMAT+"_"+r[0])
  165. }
  166. case "sink":
  167. sinkType := gn.NodeType
  168. props := gn.Props
  169. de.sinks = append(de.sinks, sinkType)
  170. resourceId, ok := props[conf.ResourceID].(string)
  171. if ok {
  172. _, ok := de.sinkConfigKeys[sinkType]
  173. if ok {
  174. de.sinkConfigKeys[sinkType] = append(de.sinkConfigKeys[sinkType], resourceId)
  175. } else {
  176. var confKeys []string
  177. confKeys = append(confKeys, resourceId)
  178. de.sinkConfigKeys[sinkType] = confKeys
  179. }
  180. }
  181. format, ok := props["format"].(string)
  182. if ok && format != "json" {
  183. schemaId, ok := props["schemaId"].(string)
  184. if ok {
  185. r := strings.Split(schemaId, ".")
  186. de.schemas = append(de.schemas, format+"_"+r[0])
  187. }
  188. }
  189. case "operator":
  190. nt := strings.ToLower(gn.NodeType)
  191. switch nt {
  192. case "function":
  193. fop, err := parseFunc(gn.Props)
  194. if err != nil {
  195. break
  196. }
  197. ast.WalkFunc(fop, func(n ast.Node) bool {
  198. switch f := n.(type) {
  199. case *ast.Call:
  200. de.functions = append(de.functions, f.Name)
  201. }
  202. return true
  203. })
  204. case "aggfunc":
  205. fop, err := parseFunc(gn.Props)
  206. if err != nil {
  207. break
  208. }
  209. ast.WalkFunc(fop, func(n ast.Node) bool {
  210. switch f := n.(type) {
  211. case *ast.Call:
  212. de.functions = append(de.functions, f.Name)
  213. }
  214. return true
  215. })
  216. case "filter":
  217. fop, err := parseFilter(gn.Props)
  218. if err != nil {
  219. break
  220. }
  221. ast.WalkFunc(fop, func(n ast.Node) bool {
  222. switch f := n.(type) {
  223. case *ast.Call:
  224. de.functions = append(de.functions, f.Name)
  225. }
  226. return true
  227. })
  228. case "pick":
  229. pop, err := parsePick(gn.Props)
  230. if err != nil {
  231. break
  232. }
  233. ast.WalkFunc(pop, func(n ast.Node) bool {
  234. switch f := n.(type) {
  235. case *ast.Call:
  236. de.functions = append(de.functions, f.Name)
  237. }
  238. return true
  239. })
  240. case "join":
  241. jop, err := parseJoin(gn.Props)
  242. if err != nil {
  243. break
  244. }
  245. ast.WalkFunc(jop, func(n ast.Node) bool {
  246. switch f := n.(type) {
  247. case *ast.Call:
  248. de.functions = append(de.functions, f.Name)
  249. }
  250. return true
  251. })
  252. case "groupby":
  253. gop, err := parseGroupBy(gn.Props)
  254. if err != nil {
  255. break
  256. }
  257. ast.WalkFunc(gop, func(n ast.Node) bool {
  258. switch f := n.(type) {
  259. case *ast.Call:
  260. de.functions = append(de.functions, f.Name)
  261. }
  262. return true
  263. })
  264. case "orderby":
  265. oop, err := parseOrderBy(gn.Props)
  266. if err != nil {
  267. break
  268. }
  269. ast.WalkFunc(oop, func(n ast.Node) bool {
  270. switch f := n.(type) {
  271. case *ast.Call:
  272. de.functions = append(de.functions, f.Name)
  273. }
  274. return true
  275. })
  276. case "switch":
  277. opArray, err := parseSwitch(gn.Props)
  278. if err != nil {
  279. break
  280. }
  281. for _, op := range opArray {
  282. ast.WalkFunc(op, func(n ast.Node) bool {
  283. switch f := n.(type) {
  284. case *ast.Call:
  285. de.functions = append(de.functions, f.Name)
  286. }
  287. return true
  288. })
  289. }
  290. }
  291. default:
  292. break
  293. }
  294. }
  295. }
  296. }
  297. func (p *RuleMigrationProcessor) ConfigurationPartialExport(rules []string) ([]byte, error) {
  298. config := &Configuration{
  299. Streams: make(map[string]string),
  300. Tables: make(map[string]string),
  301. Rules: make(map[string]string),
  302. NativePlugins: make(map[string]string),
  303. PortablePlugins: make(map[string]string),
  304. SourceConfig: make(map[string]string),
  305. SinkConfig: make(map[string]string),
  306. ConnectionConfig: make(map[string]string),
  307. Service: make(map[string]string),
  308. Schema: make(map[string]string),
  309. }
  310. config.Rules = p.exportRules(rules)
  311. de := newDependencies()
  312. for _, v := range rules {
  313. rule, _ := p.r.GetRuleById(v)
  314. if rule != nil {
  315. ruleTraverse(rule, de)
  316. }
  317. }
  318. p.exportSelected(de, config)
  319. return json.Marshal(config)
  320. }
  321. func (p *RuleMigrationProcessor) exportRules(rules []string) map[string]string {
  322. ruleSet := make(map[string]string)
  323. for _, v := range rules {
  324. ruleJson, _ := p.r.GetRuleJson(v)
  325. ruleSet[v] = ruleJson
  326. }
  327. return ruleSet
  328. }
  329. func (p *RuleMigrationProcessor) exportStreams(streams []string) map[string]string {
  330. streamSet := make(map[string]string)
  331. for _, v := range streams {
  332. streamJson, _ := p.s.GetStream(v, ast.TypeStream)
  333. streamSet[v] = streamJson
  334. }
  335. return streamSet
  336. }
  337. func (p *RuleMigrationProcessor) exportTables(tables []string) map[string]string {
  338. tableSet := make(map[string]string)
  339. for _, v := range tables {
  340. tableJson, _ := p.s.GetStream(v, ast.TypeTable)
  341. tableSet[v] = tableJson
  342. }
  343. return tableSet
  344. }
  345. func (p *RuleMigrationProcessor) exportSelected(de *dependencies, config *Configuration) {
  346. //get the stream and table
  347. config.Streams = p.exportStreams(de.streams)
  348. config.Tables = p.exportTables(de.tables)
  349. //get the sources
  350. for _, v := range de.sources {
  351. t, srcName, srcInfo := io.GetSourcePlugin(v)
  352. if t == plugin.NATIVE_EXTENSION {
  353. config.NativePlugins[srcName] = srcInfo
  354. }
  355. if t == plugin.PORTABLE_EXTENSION {
  356. config.PortablePlugins[srcName] = srcInfo
  357. }
  358. }
  359. // get sinks
  360. for _, v := range de.sinks {
  361. t, sinkName, sinkInfo := io.GetSinkPlugin(v)
  362. if t == plugin.NATIVE_EXTENSION {
  363. config.NativePlugins[sinkName] = sinkInfo
  364. }
  365. if t == plugin.PORTABLE_EXTENSION {
  366. config.PortablePlugins[sinkName] = sinkInfo
  367. }
  368. }
  369. // get functions
  370. for _, v := range de.functions {
  371. t, svcName, svcInfo := function.GetFunctionPlugin(v)
  372. if t == plugin.NATIVE_EXTENSION {
  373. config.NativePlugins[svcName] = svcInfo
  374. }
  375. if t == plugin.PORTABLE_EXTENSION {
  376. config.PortablePlugins[svcName] = svcInfo
  377. }
  378. if t == plugin.SERVICE_EXTENSION {
  379. config.Service[svcName] = svcInfo
  380. }
  381. }
  382. // get sourceCfg/sinkCfg
  383. configKeys := meta.YamlConfigurationKeys{}
  384. configKeys.Sources = de.sourceConfigKeys
  385. configKeys.Sinks = de.sinkConfigKeys
  386. configSet := meta.GetConfigurationsFor(configKeys)
  387. config.SourceConfig = configSet.Sources
  388. config.SinkConfig = configSet.Sinks
  389. config.ConnectionConfig = configSet.Connections
  390. //get schema
  391. for _, v := range de.schemas {
  392. schName, schInfo := getSchemaInstallScript(v)
  393. config.Schema[schName] = schInfo
  394. }
  395. }
  396. func parsePick(props map[string]interface{}) (*ast.SelectStatement, error) {
  397. n := &graph.Select{}
  398. err := cast.MapToStruct(props, n)
  399. if err != nil {
  400. return nil, err
  401. }
  402. stmt, err := xsql.NewParser(strings.NewReader("select " + strings.Join(n.Fields, ",") + " from nonexist")).Parse()
  403. if err != nil {
  404. return nil, err
  405. } else {
  406. return stmt, nil
  407. }
  408. }
  409. func parseFunc(props map[string]interface{}) (*ast.SelectStatement, error) {
  410. m, ok := props["expr"]
  411. if !ok {
  412. return nil, errors.New("no expr")
  413. }
  414. funcExpr, ok := m.(string)
  415. if !ok {
  416. return nil, fmt.Errorf("expr %v is not string", m)
  417. }
  418. stmt, err := xsql.NewParser(strings.NewReader("select " + funcExpr + " from nonexist")).Parse()
  419. if err != nil {
  420. return nil, err
  421. } else {
  422. return stmt, nil
  423. }
  424. }
  425. func parseFilter(props map[string]interface{}) (ast.Expr, error) {
  426. m, ok := props["expr"]
  427. if !ok {
  428. return nil, errors.New("no expr")
  429. }
  430. conditionExpr, ok := m.(string)
  431. if !ok {
  432. return nil, fmt.Errorf("expr %v is not string", m)
  433. }
  434. p := xsql.NewParser(strings.NewReader(" where " + conditionExpr))
  435. if exp, err := p.ParseCondition(); err != nil {
  436. return nil, err
  437. } else {
  438. return exp, nil
  439. }
  440. }
  441. func parseHaving(props map[string]interface{}) (ast.Expr, error) {
  442. m, ok := props["expr"]
  443. if !ok {
  444. return nil, errors.New("no expr")
  445. }
  446. conditionExpr, ok := m.(string)
  447. if !ok {
  448. return nil, fmt.Errorf("expr %v is not string", m)
  449. }
  450. p := xsql.NewParser(strings.NewReader("where " + conditionExpr))
  451. if exp, err := p.ParseCondition(); err != nil {
  452. return nil, err
  453. } else {
  454. return exp, nil
  455. }
  456. }
  457. func parseSwitch(props map[string]interface{}) ([]ast.Expr, error) {
  458. n := &graph.Switch{}
  459. err := cast.MapToStruct(props, n)
  460. if err != nil {
  461. return nil, err
  462. }
  463. if len(n.Cases) == 0 {
  464. return nil, fmt.Errorf("switch node must have at least one case")
  465. }
  466. caseExprs := make([]ast.Expr, len(n.Cases))
  467. for i, c := range n.Cases {
  468. p := xsql.NewParser(strings.NewReader("where " + c))
  469. if exp, err := p.ParseCondition(); err != nil {
  470. return nil, fmt.Errorf("parse case %d error: %v", i, err)
  471. } else {
  472. if exp != nil {
  473. caseExprs[i] = exp
  474. }
  475. }
  476. }
  477. return caseExprs, nil
  478. }
  479. func parseOrderBy(props map[string]interface{}) (*ast.SelectStatement, error) {
  480. n := &graph.Orderby{}
  481. err := cast.MapToStruct(props, n)
  482. if err != nil {
  483. return nil, err
  484. }
  485. stmt := "SELECT * FROM unknown ORDER BY"
  486. for _, s := range n.Sorts {
  487. stmt += " " + s.Field + " "
  488. if s.Desc {
  489. stmt += "DESC"
  490. }
  491. }
  492. p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
  493. if err != nil {
  494. return nil, fmt.Errorf("invalid order by statement error: %v", err)
  495. } else {
  496. return p, nil
  497. }
  498. }
  499. func parseGroupBy(props map[string]interface{}) (*ast.SelectStatement, error) {
  500. n := &graph.Groupby{}
  501. err := cast.MapToStruct(props, n)
  502. if err != nil {
  503. return nil, err
  504. }
  505. if len(n.Dimensions) == 0 {
  506. return nil, fmt.Errorf("groupby must have at least one dimension")
  507. }
  508. stmt := "SELECT * FROM unknown Group By " + strings.Join(n.Dimensions, ",")
  509. p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
  510. if err != nil {
  511. return nil, fmt.Errorf("invalid join statement error: %v", err)
  512. } else {
  513. return p, nil
  514. }
  515. }
  516. func parseJoin(props map[string]interface{}) (*ast.SelectStatement, error) {
  517. n := &graph.Join{}
  518. err := cast.MapToStruct(props, n)
  519. if err != nil {
  520. return nil, err
  521. }
  522. stmt := "SELECT * FROM " + n.From
  523. for _, join := range n.Joins {
  524. stmt += " " + join.Type + " JOIN ON " + join.On
  525. }
  526. p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
  527. if err != nil {
  528. return nil, fmt.Errorf("invalid join statement error: %v", err)
  529. } else {
  530. return p, nil
  531. }
  532. }