|
@@ -0,0 +1,273 @@
|
|
|
+// Copyright 2023 EMQ Technologies Co., Ltd.
|
|
|
+//
|
|
|
+// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
+// you may not use this file except in compliance with the License.
|
|
|
+// You may obtain a copy of the License at
|
|
|
+//
|
|
|
+// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+//
|
|
|
+// Unless required by applicable law or agreed to in writing, software
|
|
|
+// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+// See the License for the specific language governing permissions and
|
|
|
+// limitations under the License.
|
|
|
+
|
|
|
+package processor
|
|
|
+
|
|
|
+import (
|
|
|
+ "encoding/json"
|
|
|
+ "github.com/lf-edge/ekuiper/internal/binder/function"
|
|
|
+ "github.com/lf-edge/ekuiper/internal/binder/io"
|
|
|
+ "github.com/lf-edge/ekuiper/internal/meta"
|
|
|
+ store2 "github.com/lf-edge/ekuiper/internal/pkg/store"
|
|
|
+ "github.com/lf-edge/ekuiper/internal/plugin"
|
|
|
+ "github.com/lf-edge/ekuiper/internal/schema"
|
|
|
+ "github.com/lf-edge/ekuiper/internal/topo/node/conf"
|
|
|
+ "github.com/lf-edge/ekuiper/internal/xsql"
|
|
|
+ "github.com/lf-edge/ekuiper/pkg/api"
|
|
|
+ "github.com/lf-edge/ekuiper/pkg/ast"
|
|
|
+ "strings"
|
|
|
+)
|
|
|
+
|
|
|
+type RuleMigrationProcessor struct {
|
|
|
+ r *RuleProcessor
|
|
|
+ s *StreamProcessor
|
|
|
+}
|
|
|
+
|
|
|
+func NewRuleMigrationProcessor(r *RuleProcessor, s *StreamProcessor) *RuleMigrationProcessor {
|
|
|
+ return &RuleMigrationProcessor{
|
|
|
+ r: r,
|
|
|
+ s: s,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func NewDependencies() *Dependencies {
|
|
|
+ return &Dependencies{
|
|
|
+ SourceConfigKeys: map[string][]string{},
|
|
|
+ SinkConfigKeys: map[string][]string{},
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// Dependencies copy all connections related configs by hardcode
|
|
|
+type Dependencies struct {
|
|
|
+ Rules []string
|
|
|
+ Streams []string
|
|
|
+ Tables []string
|
|
|
+ Sources []string
|
|
|
+ Sinks []string
|
|
|
+ SourceConfigKeys map[string][]string
|
|
|
+ SinkConfigKeys map[string][]string
|
|
|
+ Functions []string
|
|
|
+ Schemas []string
|
|
|
+}
|
|
|
+
|
|
|
+func ruleTraverse(rule *api.Rule, de *Dependencies) {
|
|
|
+ sql := rule.Sql
|
|
|
+ if sql != "" {
|
|
|
+ stmt, err := xsql.GetStatementFromSql(sql)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ err, store := store2.GetKV("stream")
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ //streams
|
|
|
+ streamsFromStmt := xsql.GetStreams(stmt)
|
|
|
+ for _, s := range streamsFromStmt {
|
|
|
+ streamStmt, err := xsql.GetDataSource(store, s)
|
|
|
+ if err != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if streamStmt.StreamType == ast.TypeStream {
|
|
|
+ //get streams
|
|
|
+ de.Streams = append(de.Streams, string(streamStmt.Name))
|
|
|
+ } else if streamStmt.StreamType == ast.TypeTable {
|
|
|
+ //get tables
|
|
|
+ de.Tables = append(de.Tables, string(streamStmt.Name))
|
|
|
+ }
|
|
|
+
|
|
|
+ //get source type
|
|
|
+ de.Sources = append(de.Sources, streamStmt.Options.TYPE)
|
|
|
+ //get config key
|
|
|
+ _, ok := de.SourceConfigKeys[streamStmt.Options.TYPE]
|
|
|
+ if ok {
|
|
|
+ de.SourceConfigKeys[streamStmt.Options.TYPE] = append(de.SourceConfigKeys[streamStmt.Options.TYPE], streamStmt.Options.CONF_KEY)
|
|
|
+ } else {
|
|
|
+ var confKeys []string
|
|
|
+ confKeys = append(confKeys, streamStmt.Options.CONF_KEY)
|
|
|
+ de.SourceConfigKeys[streamStmt.Options.TYPE] = confKeys
|
|
|
+ }
|
|
|
+
|
|
|
+ //get schema id
|
|
|
+ if streamStmt.Options.SCHEMAID != "" {
|
|
|
+ r := strings.Split(streamStmt.Options.SCHEMAID, ".")
|
|
|
+ de.Schemas = append(de.Schemas, streamStmt.Options.FORMAT+"_"+r[0])
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //actions
|
|
|
+ for _, m := range rule.Actions {
|
|
|
+ for name, action := range m {
|
|
|
+ props, _ := action.(map[string]interface{})
|
|
|
+ de.Sinks = append(de.Sinks, name)
|
|
|
+ resourceId, ok := props[conf.ResourceID].(string)
|
|
|
+ if ok {
|
|
|
+ _, ok := de.SinkConfigKeys[name]
|
|
|
+ if ok {
|
|
|
+ de.SinkConfigKeys[name] = append(de.SinkConfigKeys[name], resourceId)
|
|
|
+ } else {
|
|
|
+ var confKeys []string
|
|
|
+ confKeys = append(confKeys, resourceId)
|
|
|
+ de.SinkConfigKeys[name] = confKeys
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ format, ok := props["format"].(string)
|
|
|
+ if ok && format != "json" {
|
|
|
+ schemaId, ok := props["schemaId"].(string)
|
|
|
+ if ok {
|
|
|
+ r := strings.Split(schemaId, ".")
|
|
|
+ de.Schemas = append(de.Schemas, format+"_"+r[0])
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // function
|
|
|
+ ast.WalkFunc(stmt, func(n ast.Node) bool {
|
|
|
+ switch f := n.(type) {
|
|
|
+ case *ast.Call:
|
|
|
+ de.Functions = append(de.Functions, f.Name)
|
|
|
+ }
|
|
|
+ return true
|
|
|
+ })
|
|
|
+
|
|
|
+ //Rules
|
|
|
+ de.Rules = append(de.Rules, rule.Id)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+type Configuration struct {
|
|
|
+ Streams map[string]string `json:"streams"`
|
|
|
+ Tables map[string]string `json:"tables"`
|
|
|
+ Rules map[string]string `json:"rules"`
|
|
|
+ NativePlugins map[string]string `json:"nativePlugins"`
|
|
|
+ PortablePlugins map[string]string `json:"portablePlugins"`
|
|
|
+ SourceConfig map[string]string `json:"sourceConfig"`
|
|
|
+ SinkConfig map[string]string `json:"sinkConfig"`
|
|
|
+ ConnectionConfig map[string]string `json:"connectionConfig"`
|
|
|
+ Service map[string]string `json:"Service"`
|
|
|
+ Schema map[string]string `json:"Schema"`
|
|
|
+}
|
|
|
+
|
|
|
+func (p *RuleMigrationProcessor) ConfigurationPartialExport(rules []string) ([]byte, error) {
|
|
|
+ conf := &Configuration{
|
|
|
+ Streams: make(map[string]string),
|
|
|
+ Tables: make(map[string]string),
|
|
|
+ Rules: make(map[string]string),
|
|
|
+ NativePlugins: make(map[string]string),
|
|
|
+ PortablePlugins: make(map[string]string),
|
|
|
+ SourceConfig: make(map[string]string),
|
|
|
+ SinkConfig: make(map[string]string),
|
|
|
+ ConnectionConfig: make(map[string]string),
|
|
|
+ Service: make(map[string]string),
|
|
|
+ Schema: make(map[string]string),
|
|
|
+ }
|
|
|
+ conf.Rules = p.exportRules(rules)
|
|
|
+
|
|
|
+ de := NewDependencies()
|
|
|
+ for _, v := range rules {
|
|
|
+ rule, _ := p.r.GetRuleById(v)
|
|
|
+ if rule != nil {
|
|
|
+ ruleTraverse(rule, de)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ p.exportSelected(de, conf)
|
|
|
+
|
|
|
+ return json.Marshal(conf)
|
|
|
+}
|
|
|
+
|
|
|
+func (p *RuleMigrationProcessor) exportRules(rules []string) map[string]string {
|
|
|
+ ruleSet := make(map[string]string)
|
|
|
+
|
|
|
+ for _, v := range rules {
|
|
|
+ ruleJson, _ := p.r.GetRuleJson(v)
|
|
|
+ ruleSet[v] = ruleJson
|
|
|
+ }
|
|
|
+ return ruleSet
|
|
|
+}
|
|
|
+
|
|
|
+func (p *RuleMigrationProcessor) exportStreams(streams []string) map[string]string {
|
|
|
+ streamSet := make(map[string]string)
|
|
|
+
|
|
|
+ for _, v := range streams {
|
|
|
+ streamJson, _ := p.s.GetStream(v, ast.TypeStream)
|
|
|
+ streamSet[v] = streamJson
|
|
|
+ }
|
|
|
+ return streamSet
|
|
|
+}
|
|
|
+
|
|
|
+func (p *RuleMigrationProcessor) exportTables(tables []string) map[string]string {
|
|
|
+ tableSet := make(map[string]string)
|
|
|
+
|
|
|
+ for _, v := range tables {
|
|
|
+ tableJson, _ := p.s.GetStream(v, ast.TypeTable)
|
|
|
+ tableSet[v] = tableJson
|
|
|
+ }
|
|
|
+ return tableSet
|
|
|
+}
|
|
|
+
|
|
|
+func (p *RuleMigrationProcessor) exportSelected(de *Dependencies, config *Configuration) {
|
|
|
+ //get the stream and table
|
|
|
+ config.Streams = p.exportStreams(de.Streams)
|
|
|
+ config.Tables = p.exportTables(de.Tables)
|
|
|
+ //get the sources
|
|
|
+ for _, v := range de.Sources {
|
|
|
+ t, srcName, srcInfo := io.GetSourcePlugin(v)
|
|
|
+ if t == plugin.NATIVE_EXTENSION {
|
|
|
+ config.NativePlugins[srcName] = srcInfo
|
|
|
+ }
|
|
|
+ if t == plugin.PORTABLE_EXTENSION {
|
|
|
+ config.PortablePlugins[srcName] = srcInfo
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // get sinks
|
|
|
+ for _, v := range de.Sinks {
|
|
|
+ t, sinkName, sinkInfo := io.GetSinkPlugin(v)
|
|
|
+ if t == plugin.NATIVE_EXTENSION {
|
|
|
+ config.NativePlugins[sinkName] = sinkInfo
|
|
|
+ }
|
|
|
+ if t == plugin.PORTABLE_EXTENSION {
|
|
|
+ config.PortablePlugins[sinkName] = sinkInfo
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // get functions
|
|
|
+ for _, v := range de.Functions {
|
|
|
+ t, svcName, svcInfo := function.GetFunctionPlugin(v)
|
|
|
+ if t == plugin.NATIVE_EXTENSION {
|
|
|
+ config.NativePlugins[svcName] = svcInfo
|
|
|
+ }
|
|
|
+ if t == plugin.PORTABLE_EXTENSION {
|
|
|
+ config.PortablePlugins[svcName] = svcInfo
|
|
|
+ }
|
|
|
+ if t == plugin.SERVICE_EXTENSION {
|
|
|
+ config.Service[svcName] = svcInfo
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // get sourceCfg/sinkCfg
|
|
|
+ configKeys := meta.YamlConfigurationKeys{}
|
|
|
+ configKeys.Sources = de.SourceConfigKeys
|
|
|
+ configKeys.Sinks = de.SinkConfigKeys
|
|
|
+ configSet := meta.GetConfigurationsFor(configKeys)
|
|
|
+ config.SourceConfig = configSet.Sources
|
|
|
+ config.SinkConfig = configSet.Sinks
|
|
|
+ config.ConnectionConfig = configSet.Connections
|
|
|
+
|
|
|
+ //get schema
|
|
|
+ for _, v := range de.Schemas {
|
|
|
+ schName, schInfo := schema.GetSchemaInstallScript(v)
|
|
|
+ config.Schema[schName] = schInfo
|
|
|
+ }
|
|
|
+}
|