rule_migration.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package processor
  15. import (
  16. "encoding/json"
  17. "github.com/lf-edge/ekuiper/internal/binder/function"
  18. "github.com/lf-edge/ekuiper/internal/binder/io"
  19. "github.com/lf-edge/ekuiper/internal/meta"
  20. store2 "github.com/lf-edge/ekuiper/internal/pkg/store"
  21. "github.com/lf-edge/ekuiper/internal/plugin"
  22. "github.com/lf-edge/ekuiper/internal/schema"
  23. "github.com/lf-edge/ekuiper/internal/topo/node/conf"
  24. "github.com/lf-edge/ekuiper/internal/xsql"
  25. "github.com/lf-edge/ekuiper/pkg/api"
  26. "github.com/lf-edge/ekuiper/pkg/ast"
  27. "strings"
  28. )
  29. type RuleMigrationProcessor struct {
  30. r *RuleProcessor
  31. s *StreamProcessor
  32. }
  33. func NewRuleMigrationProcessor(r *RuleProcessor, s *StreamProcessor) *RuleMigrationProcessor {
  34. return &RuleMigrationProcessor{
  35. r: r,
  36. s: s,
  37. }
  38. }
  39. func NewDependencies() *Dependencies {
  40. return &Dependencies{
  41. SourceConfigKeys: map[string][]string{},
  42. SinkConfigKeys: map[string][]string{},
  43. }
  44. }
  45. // Dependencies copy all connections related configs by hardcode
  46. type Dependencies struct {
  47. Rules []string
  48. Streams []string
  49. Tables []string
  50. Sources []string
  51. Sinks []string
  52. SourceConfigKeys map[string][]string
  53. SinkConfigKeys map[string][]string
  54. Functions []string
  55. Schemas []string
  56. }
  57. func ruleTraverse(rule *api.Rule, de *Dependencies) {
  58. sql := rule.Sql
  59. if sql != "" {
  60. stmt, err := xsql.GetStatementFromSql(sql)
  61. if err != nil {
  62. return
  63. }
  64. err, store := store2.GetKV("stream")
  65. if err != nil {
  66. return
  67. }
  68. //streams
  69. streamsFromStmt := xsql.GetStreams(stmt)
  70. for _, s := range streamsFromStmt {
  71. streamStmt, err := xsql.GetDataSource(store, s)
  72. if err != nil {
  73. continue
  74. }
  75. if streamStmt.StreamType == ast.TypeStream {
  76. //get streams
  77. de.Streams = append(de.Streams, string(streamStmt.Name))
  78. } else if streamStmt.StreamType == ast.TypeTable {
  79. //get tables
  80. de.Tables = append(de.Tables, string(streamStmt.Name))
  81. }
  82. //get source type
  83. de.Sources = append(de.Sources, streamStmt.Options.TYPE)
  84. //get config key
  85. _, ok := de.SourceConfigKeys[streamStmt.Options.TYPE]
  86. if ok {
  87. de.SourceConfigKeys[streamStmt.Options.TYPE] = append(de.SourceConfigKeys[streamStmt.Options.TYPE], streamStmt.Options.CONF_KEY)
  88. } else {
  89. var confKeys []string
  90. confKeys = append(confKeys, streamStmt.Options.CONF_KEY)
  91. de.SourceConfigKeys[streamStmt.Options.TYPE] = confKeys
  92. }
  93. //get schema id
  94. if streamStmt.Options.SCHEMAID != "" {
  95. r := strings.Split(streamStmt.Options.SCHEMAID, ".")
  96. de.Schemas = append(de.Schemas, streamStmt.Options.FORMAT+"_"+r[0])
  97. }
  98. }
  99. //actions
  100. for _, m := range rule.Actions {
  101. for name, action := range m {
  102. props, _ := action.(map[string]interface{})
  103. de.Sinks = append(de.Sinks, name)
  104. resourceId, ok := props[conf.ResourceID].(string)
  105. if ok {
  106. _, ok := de.SinkConfigKeys[name]
  107. if ok {
  108. de.SinkConfigKeys[name] = append(de.SinkConfigKeys[name], resourceId)
  109. } else {
  110. var confKeys []string
  111. confKeys = append(confKeys, resourceId)
  112. de.SinkConfigKeys[name] = confKeys
  113. }
  114. }
  115. format, ok := props["format"].(string)
  116. if ok && format != "json" {
  117. schemaId, ok := props["schemaId"].(string)
  118. if ok {
  119. r := strings.Split(schemaId, ".")
  120. de.Schemas = append(de.Schemas, format+"_"+r[0])
  121. }
  122. }
  123. }
  124. }
  125. // function
  126. ast.WalkFunc(stmt, func(n ast.Node) bool {
  127. switch f := n.(type) {
  128. case *ast.Call:
  129. de.Functions = append(de.Functions, f.Name)
  130. }
  131. return true
  132. })
  133. //Rules
  134. de.Rules = append(de.Rules, rule.Id)
  135. }
  136. }
  137. type Configuration struct {
  138. Streams map[string]string `json:"streams"`
  139. Tables map[string]string `json:"tables"`
  140. Rules map[string]string `json:"rules"`
  141. NativePlugins map[string]string `json:"nativePlugins"`
  142. PortablePlugins map[string]string `json:"portablePlugins"`
  143. SourceConfig map[string]string `json:"sourceConfig"`
  144. SinkConfig map[string]string `json:"sinkConfig"`
  145. ConnectionConfig map[string]string `json:"connectionConfig"`
  146. Service map[string]string `json:"Service"`
  147. Schema map[string]string `json:"Schema"`
  148. }
  149. func (p *RuleMigrationProcessor) ConfigurationPartialExport(rules []string) ([]byte, error) {
  150. conf := &Configuration{
  151. Streams: make(map[string]string),
  152. Tables: make(map[string]string),
  153. Rules: make(map[string]string),
  154. NativePlugins: make(map[string]string),
  155. PortablePlugins: make(map[string]string),
  156. SourceConfig: make(map[string]string),
  157. SinkConfig: make(map[string]string),
  158. ConnectionConfig: make(map[string]string),
  159. Service: make(map[string]string),
  160. Schema: make(map[string]string),
  161. }
  162. conf.Rules = p.exportRules(rules)
  163. de := NewDependencies()
  164. for _, v := range rules {
  165. rule, _ := p.r.GetRuleById(v)
  166. if rule != nil {
  167. ruleTraverse(rule, de)
  168. }
  169. }
  170. p.exportSelected(de, conf)
  171. return json.Marshal(conf)
  172. }
  173. func (p *RuleMigrationProcessor) exportRules(rules []string) map[string]string {
  174. ruleSet := make(map[string]string)
  175. for _, v := range rules {
  176. ruleJson, _ := p.r.GetRuleJson(v)
  177. ruleSet[v] = ruleJson
  178. }
  179. return ruleSet
  180. }
  181. func (p *RuleMigrationProcessor) exportStreams(streams []string) map[string]string {
  182. streamSet := make(map[string]string)
  183. for _, v := range streams {
  184. streamJson, _ := p.s.GetStream(v, ast.TypeStream)
  185. streamSet[v] = streamJson
  186. }
  187. return streamSet
  188. }
  189. func (p *RuleMigrationProcessor) exportTables(tables []string) map[string]string {
  190. tableSet := make(map[string]string)
  191. for _, v := range tables {
  192. tableJson, _ := p.s.GetStream(v, ast.TypeTable)
  193. tableSet[v] = tableJson
  194. }
  195. return tableSet
  196. }
  197. func (p *RuleMigrationProcessor) exportSelected(de *Dependencies, config *Configuration) {
  198. //get the stream and table
  199. config.Streams = p.exportStreams(de.Streams)
  200. config.Tables = p.exportTables(de.Tables)
  201. //get the sources
  202. for _, v := range de.Sources {
  203. t, srcName, srcInfo := io.GetSourcePlugin(v)
  204. if t == plugin.NATIVE_EXTENSION {
  205. config.NativePlugins[srcName] = srcInfo
  206. }
  207. if t == plugin.PORTABLE_EXTENSION {
  208. config.PortablePlugins[srcName] = srcInfo
  209. }
  210. }
  211. // get sinks
  212. for _, v := range de.Sinks {
  213. t, sinkName, sinkInfo := io.GetSinkPlugin(v)
  214. if t == plugin.NATIVE_EXTENSION {
  215. config.NativePlugins[sinkName] = sinkInfo
  216. }
  217. if t == plugin.PORTABLE_EXTENSION {
  218. config.PortablePlugins[sinkName] = sinkInfo
  219. }
  220. }
  221. // get functions
  222. for _, v := range de.Functions {
  223. t, svcName, svcInfo := function.GetFunctionPlugin(v)
  224. if t == plugin.NATIVE_EXTENSION {
  225. config.NativePlugins[svcName] = svcInfo
  226. }
  227. if t == plugin.PORTABLE_EXTENSION {
  228. config.PortablePlugins[svcName] = svcInfo
  229. }
  230. if t == plugin.SERVICE_EXTENSION {
  231. config.Service[svcName] = svcInfo
  232. }
  233. }
  234. // get sourceCfg/sinkCfg
  235. configKeys := meta.YamlConfigurationKeys{}
  236. configKeys.Sources = de.SourceConfigKeys
  237. configKeys.Sinks = de.SinkConfigKeys
  238. configSet := meta.GetConfigurationsFor(configKeys)
  239. config.SourceConfig = configSet.Sources
  240. config.SinkConfig = configSet.Sinks
  241. config.ConnectionConfig = configSet.Connections
  242. //get schema
  243. for _, v := range de.Schemas {
  244. schName, schInfo := schema.GetSchemaInstallScript(v)
  245. config.Schema[schName] = schInfo
  246. }
  247. }