123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- // Copyright 2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package processor
- import (
- "encoding/json"
- "github.com/lf-edge/ekuiper/internal/binder/function"
- "github.com/lf-edge/ekuiper/internal/binder/io"
- "github.com/lf-edge/ekuiper/internal/meta"
- store2 "github.com/lf-edge/ekuiper/internal/pkg/store"
- "github.com/lf-edge/ekuiper/internal/plugin"
- "github.com/lf-edge/ekuiper/internal/schema"
- "github.com/lf-edge/ekuiper/internal/topo/node/conf"
- "github.com/lf-edge/ekuiper/internal/xsql"
- "github.com/lf-edge/ekuiper/pkg/api"
- "github.com/lf-edge/ekuiper/pkg/ast"
- "strings"
- )
- type RuleMigrationProcessor struct {
- r *RuleProcessor
- s *StreamProcessor
- }
- func NewRuleMigrationProcessor(r *RuleProcessor, s *StreamProcessor) *RuleMigrationProcessor {
- return &RuleMigrationProcessor{
- r: r,
- s: s,
- }
- }
- func NewDependencies() *Dependencies {
- return &Dependencies{
- SourceConfigKeys: map[string][]string{},
- SinkConfigKeys: map[string][]string{},
- }
- }
- // Dependencies copy all connections related configs by hardcode
- type Dependencies struct {
- Rules []string
- Streams []string
- Tables []string
- Sources []string
- Sinks []string
- SourceConfigKeys map[string][]string
- SinkConfigKeys map[string][]string
- Functions []string
- Schemas []string
- }
- func ruleTraverse(rule *api.Rule, de *Dependencies) {
- sql := rule.Sql
- if sql != "" {
- stmt, err := xsql.GetStatementFromSql(sql)
- if err != nil {
- return
- }
- err, store := store2.GetKV("stream")
- if err != nil {
- return
- }
- //streams
- streamsFromStmt := xsql.GetStreams(stmt)
- for _, s := range streamsFromStmt {
- streamStmt, err := xsql.GetDataSource(store, s)
- if err != nil {
- continue
- }
- if streamStmt.StreamType == ast.TypeStream {
- //get streams
- de.Streams = append(de.Streams, string(streamStmt.Name))
- } else if streamStmt.StreamType == ast.TypeTable {
- //get tables
- de.Tables = append(de.Tables, string(streamStmt.Name))
- }
- //get source type
- de.Sources = append(de.Sources, streamStmt.Options.TYPE)
- //get config key
- _, ok := de.SourceConfigKeys[streamStmt.Options.TYPE]
- if ok {
- de.SourceConfigKeys[streamStmt.Options.TYPE] = append(de.SourceConfigKeys[streamStmt.Options.TYPE], streamStmt.Options.CONF_KEY)
- } else {
- var confKeys []string
- confKeys = append(confKeys, streamStmt.Options.CONF_KEY)
- de.SourceConfigKeys[streamStmt.Options.TYPE] = confKeys
- }
- //get schema id
- if streamStmt.Options.SCHEMAID != "" {
- r := strings.Split(streamStmt.Options.SCHEMAID, ".")
- de.Schemas = append(de.Schemas, streamStmt.Options.FORMAT+"_"+r[0])
- }
- }
- //actions
- for _, m := range rule.Actions {
- for name, action := range m {
- props, _ := action.(map[string]interface{})
- de.Sinks = append(de.Sinks, name)
- resourceId, ok := props[conf.ResourceID].(string)
- if ok {
- _, ok := de.SinkConfigKeys[name]
- if ok {
- de.SinkConfigKeys[name] = append(de.SinkConfigKeys[name], resourceId)
- } else {
- var confKeys []string
- confKeys = append(confKeys, resourceId)
- de.SinkConfigKeys[name] = confKeys
- }
- }
- format, ok := props["format"].(string)
- if ok && format != "json" {
- schemaId, ok := props["schemaId"].(string)
- if ok {
- r := strings.Split(schemaId, ".")
- de.Schemas = append(de.Schemas, format+"_"+r[0])
- }
- }
- }
- }
- // function
- ast.WalkFunc(stmt, func(n ast.Node) bool {
- switch f := n.(type) {
- case *ast.Call:
- de.Functions = append(de.Functions, f.Name)
- }
- return true
- })
- //Rules
- de.Rules = append(de.Rules, rule.Id)
- }
- }
- type Configuration struct {
- Streams map[string]string `json:"streams"`
- Tables map[string]string `json:"tables"`
- Rules map[string]string `json:"rules"`
- NativePlugins map[string]string `json:"nativePlugins"`
- PortablePlugins map[string]string `json:"portablePlugins"`
- SourceConfig map[string]string `json:"sourceConfig"`
- SinkConfig map[string]string `json:"sinkConfig"`
- ConnectionConfig map[string]string `json:"connectionConfig"`
- Service map[string]string `json:"Service"`
- Schema map[string]string `json:"Schema"`
- }
- func (p *RuleMigrationProcessor) ConfigurationPartialExport(rules []string) ([]byte, error) {
- conf := &Configuration{
- Streams: make(map[string]string),
- Tables: make(map[string]string),
- Rules: make(map[string]string),
- NativePlugins: make(map[string]string),
- PortablePlugins: make(map[string]string),
- SourceConfig: make(map[string]string),
- SinkConfig: make(map[string]string),
- ConnectionConfig: make(map[string]string),
- Service: make(map[string]string),
- Schema: make(map[string]string),
- }
- conf.Rules = p.exportRules(rules)
- de := NewDependencies()
- for _, v := range rules {
- rule, _ := p.r.GetRuleById(v)
- if rule != nil {
- ruleTraverse(rule, de)
- }
- }
- p.exportSelected(de, conf)
- return json.Marshal(conf)
- }
- func (p *RuleMigrationProcessor) exportRules(rules []string) map[string]string {
- ruleSet := make(map[string]string)
- for _, v := range rules {
- ruleJson, _ := p.r.GetRuleJson(v)
- ruleSet[v] = ruleJson
- }
- return ruleSet
- }
- func (p *RuleMigrationProcessor) exportStreams(streams []string) map[string]string {
- streamSet := make(map[string]string)
- for _, v := range streams {
- streamJson, _ := p.s.GetStream(v, ast.TypeStream)
- streamSet[v] = streamJson
- }
- return streamSet
- }
- func (p *RuleMigrationProcessor) exportTables(tables []string) map[string]string {
- tableSet := make(map[string]string)
- for _, v := range tables {
- tableJson, _ := p.s.GetStream(v, ast.TypeTable)
- tableSet[v] = tableJson
- }
- return tableSet
- }
- func (p *RuleMigrationProcessor) exportSelected(de *Dependencies, config *Configuration) {
- //get the stream and table
- config.Streams = p.exportStreams(de.Streams)
- config.Tables = p.exportTables(de.Tables)
- //get the sources
- for _, v := range de.Sources {
- t, srcName, srcInfo := io.GetSourcePlugin(v)
- if t == plugin.NATIVE_EXTENSION {
- config.NativePlugins[srcName] = srcInfo
- }
- if t == plugin.PORTABLE_EXTENSION {
- config.PortablePlugins[srcName] = srcInfo
- }
- }
- // get sinks
- for _, v := range de.Sinks {
- t, sinkName, sinkInfo := io.GetSinkPlugin(v)
- if t == plugin.NATIVE_EXTENSION {
- config.NativePlugins[sinkName] = sinkInfo
- }
- if t == plugin.PORTABLE_EXTENSION {
- config.PortablePlugins[sinkName] = sinkInfo
- }
- }
- // get functions
- for _, v := range de.Functions {
- t, svcName, svcInfo := function.GetFunctionPlugin(v)
- if t == plugin.NATIVE_EXTENSION {
- config.NativePlugins[svcName] = svcInfo
- }
- if t == plugin.PORTABLE_EXTENSION {
- config.PortablePlugins[svcName] = svcInfo
- }
- if t == plugin.SERVICE_EXTENSION {
- config.Service[svcName] = svcInfo
- }
- }
- // get sourceCfg/sinkCfg
- configKeys := meta.YamlConfigurationKeys{}
- configKeys.Sources = de.SourceConfigKeys
- configKeys.Sinks = de.SinkConfigKeys
- configSet := meta.GetConfigurationsFor(configKeys)
- config.SourceConfig = configSet.Sources
- config.SinkConfig = configSet.Sinks
- config.ConnectionConfig = configSet.Connections
- //get schema
- for _, v := range de.Schemas {
- schName, schInfo := schema.GetSchemaInstallScript(v)
- config.Schema[schName] = schInfo
- }
- }
|