123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- // Copyright 2022-2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package processor
- import (
- "bytes"
- "encoding/json"
- "fmt"
- "io"
- "github.com/lf-edge/ekuiper/internal/conf"
- )
- type RulesetProcessor struct {
- r *RuleProcessor
- s *StreamProcessor
- }
- type Ruleset struct {
- Streams map[string]string `json:"streams"`
- Tables map[string]string `json:"tables"`
- Rules map[string]string `json:"rules"`
- }
- func NewRulesetProcessor(r *RuleProcessor, s *StreamProcessor) *RulesetProcessor {
- return &RulesetProcessor{
- r: r,
- s: s,
- }
- }
- func (rs *RulesetProcessor) Export() (io.ReadSeeker, []int, error) {
- var all Ruleset
- allStreams, err := rs.s.GetAll()
- if err != nil {
- return nil, nil, fmt.Errorf("fail to get all streams: %v", err)
- }
- all.Streams = allStreams["streams"]
- all.Tables = allStreams["tables"]
- rules, err := rs.r.GetAllRulesJson()
- if err != nil {
- return nil, nil, fmt.Errorf("fail to get all rules: %v", err)
- }
- all.Rules = rules
- jsonBytes, err := json.Marshal(all)
- if err != nil {
- return nil, nil, err
- }
- counts := []int{len(all.Streams), len(all.Tables), len(all.Rules)}
- return bytes.NewReader(jsonBytes), counts, nil
- }
- func (rs *RulesetProcessor) ExportRuleSet() *Ruleset {
- all := &Ruleset{}
- allStreams, err := rs.s.GetAll()
- if err != nil {
- conf.Log.Errorf("fail to get all streams: %v", err)
- return nil
- }
- all.Streams = allStreams["streams"]
- all.Tables = allStreams["tables"]
- rules, err := rs.r.GetAllRulesJson()
- if err != nil {
- conf.Log.Errorf("fail to get all rules: %v", err)
- return nil
- }
- all.Rules = rules
- return all
- }
- func (rs *RulesetProcessor) ExportRuleSetStatus() *Ruleset {
- all := &Ruleset{}
- allStreams, err := rs.s.streamStatusDb.All()
- if err != nil {
- conf.Log.Errorf("fail to get all stream status: %v", err)
- return nil
- }
- allTables, err := rs.s.tableStatusDb.All()
- if err != nil {
- conf.Log.Errorf("fail to get all table status: %v", err)
- return nil
- }
- all.Streams = allStreams
- all.Tables = allTables
- rules, err := rs.r.ruleStatusDb.All()
- if err != nil {
- conf.Log.Errorf("fail to get all rule status: %v", err)
- return nil
- }
- all.Rules = rules
- return all
- }
- func (rs *RulesetProcessor) Import(content []byte) ([]string, []int, error) {
- all := &Ruleset{}
- err := json.Unmarshal(content, all)
- if err != nil {
- return nil, nil, fmt.Errorf("invalid import file: %v", err)
- }
- counts := make([]int, 3)
- // restore streams
- for k, v := range all.Streams {
- _, e := rs.s.ExecStreamSql(v)
- if e != nil {
- conf.Log.Errorf("Fail to import stream %s(%s) with error: %v", k, v, e)
- } else {
- counts[0]++
- }
- }
- // restore tables
- for k, v := range all.Tables {
- _, e := rs.s.ExecStreamSql(v)
- if e != nil {
- conf.Log.Errorf("Fail to import table %s(%s) with error: %v", k, v, e)
- } else {
- counts[1]++
- }
- }
- var rules []string
- // restore rules
- for k, v := range all.Rules {
- _, e := rs.r.ExecCreateWithValidation(k, v)
- if e != nil {
- conf.Log.Errorf("Fail to import rule %s(%s) with error: %v", k, v, e)
- } else {
- rules = append(rules, k)
- counts[2]++
- }
- }
- return rules, counts, nil
- }
- func (rs *RulesetProcessor) ImportRuleSet(all Ruleset) Ruleset {
- ruleSetRsp := Ruleset{
- Rules: map[string]string{},
- Streams: map[string]string{},
- Tables: map[string]string{},
- }
- _ = rs.s.streamStatusDb.Clean()
- _ = rs.s.tableStatusDb.Clean()
- _ = rs.r.ruleStatusDb.Clean()
- counts := make([]int, 3)
- // restore streams
- for k, v := range all.Streams {
- _, e := rs.s.ExecStreamSql(v)
- if e != nil {
- conf.Log.Errorf("Fail to import stream %s(%s) with error: %v", k, v, e)
- _ = rs.s.streamStatusDb.Set(k, e.Error())
- ruleSetRsp.Streams[k] = e.Error()
- continue
- }
- counts[0]++
- }
- // restore tables
- for k, v := range all.Tables {
- _, e := rs.s.ExecStreamSql(v)
- if e != nil {
- conf.Log.Errorf("Fail to import table %s(%s) with error: %v", k, v, e)
- _ = rs.s.tableStatusDb.Set(k, e.Error())
- ruleSetRsp.Tables[k] = e.Error()
- continue
- }
- counts[1]++
- }
- // restore rules
- for k, v := range all.Rules {
- _, e := rs.r.ExecCreateWithValidation(k, v)
- if e != nil {
- conf.Log.Errorf("Fail to import rule %s(%s) with error: %v", k, v, e)
- _ = rs.r.ruleStatusDb.Set(k, e.Error())
- ruleSetRsp.Rules[k] = e.Error()
- continue
- }
- counts[2]++
- }
- return ruleSetRsp
- }
|