فهرست منبع

feat(processor): processors to import and export all streams/rules

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 سال پیش
والد
کامیت
76a59e0614

+ 23 - 0
internal/pkg/store/redis/redisKv.go

@@ -22,6 +22,7 @@ import (
 	"encoding/gob"
 	"fmt"
 	"github.com/gomodule/redigo/redis"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
 	"strings"
 )
@@ -125,6 +126,28 @@ func (kv redisKvStore) Keys() ([]string, error) {
 	return result, nil
 }
 
+func (kv redisKvStore) All() (map[string]string, error) {
+	keys, err := kv.metaKeys()
+	if err != nil {
+		return nil, err
+	}
+	var (
+		value  string
+		result = make(map[string]string)
+	)
+	for _, k := range keys {
+		key := kv.trimPrefix(k)
+		ok, err := kv.Get(key, &value)
+		if err != nil {
+			conf.Log.Errorf("get %s fail during get all in redi: %v", key, err)
+		}
+		if ok {
+			result[key] = value
+		}
+	}
+	return result, nil
+}
+
 func (kv redisKvStore) metaKeys() ([]string, error) {
 	keys := make([]string, 0)
 	err := kv.database.Apply(func(conn redis.Conn) error {

+ 8 - 0
internal/pkg/store/redis/redisKv_test.go

@@ -51,6 +51,14 @@ func TestRedisKvKeys(t *testing.T) {
 	common.TestKvKeys(length, ks, t)
 }
 
+func TestRedisKvAll(t *testing.T) {
+	ks, db, minRedis := setupRedisKv()
+	defer cleanRedisKv(db, minRedis)
+
+	length := 10
+	common.TestKvAll(length, ks, t)
+}
+
 func setupRedisKv() (kv.KeyValue, *Instance, *miniredis.Miniredis) {
 	minRedis, err := miniredis.Run()
 	if err != nil {

+ 31 - 0
internal/pkg/store/sql/sqlKv.go

@@ -141,6 +141,37 @@ func (kv *sqlKvStore) Keys() ([]string, error) {
 	return keys, err
 }
 
+func (kv *sqlKvStore) All() (all map[string]string, err error) {
+	err = kv.database.Apply(func(db *sql.DB) error {
+		query := fmt.Sprintf("SELECT * FROM '%s'", kv.table)
+		row, e := db.Query(query)
+		if nil != e {
+			return e
+		}
+		defer row.Close()
+		var (
+			key      string
+			valBytes []byte
+			value    string
+		)
+		all = make(map[string]string)
+		for row.Next() {
+			e = row.Scan(&key, &valBytes)
+			if nil != e {
+				return e
+			} else {
+				dec := gob.NewDecoder(bytes.NewBuffer(valBytes))
+				if err := dec.Decode(&value); err != nil {
+					return err
+				}
+				all[key] = value
+			}
+		}
+		return nil
+	})
+	return
+}
+
 func (kv *sqlKvStore) Clean() error {
 	return kv.database.Apply(func(db *sql.DB) error {
 		query := fmt.Sprintf("DELETE FROM '%s'", kv.table)

+ 8 - 0
internal/pkg/store/sql/sqlKv_test.go

@@ -54,6 +54,14 @@ func TestSqlKvKeys(t *testing.T) {
 	common.TestKvKeys(length, ks, t)
 }
 
+func TestSqlKvAll(t *testing.T) {
+	ks, db, abs := setupSqlKv()
+	defer cleanSqlKv(db, abs)
+
+	length := 10
+	common.TestKvAll(length, ks, t)
+}
+
 func deleteIfExists(abs string) error {
 	absPath := path.Join(abs, SDbName)
 	if f, _ := os.Stat(absPath); f != nil {

+ 29 - 1
internal/pkg/store/test/common/test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -82,3 +82,31 @@ func TestKvKeys(length int, ks kv.KeyValue, t *testing.T) {
 		t.Errorf("Keys do not match expected %s != %s", keys, expected)
 	}
 }
+
+func TestKvAll(length int, ks kv.KeyValue, t *testing.T) {
+
+	expected := make(map[string]string)
+	for i := 0; i < length; i++ {
+		key := fmt.Sprintf("key-%d", i)
+		value := fmt.Sprintf("value-%d", i)
+		if err := ks.Setnx(key, value); err != nil {
+			t.Errorf("It should be set")
+		}
+		expected[key] = value
+	}
+
+	var (
+		all map[string]string
+		err error
+	)
+	if all, err = ks.All(); err != nil {
+		t.Errorf("Failed to get value: %s.", err)
+		return
+	} else if length != len(all) {
+		t.Errorf("expect: %d, got: %d", length, len(all))
+		return
+	}
+	if !reflect.DeepEqual(all, expected) {
+		t.Errorf("All values do not match expected %s != %s", all, expected)
+	}
+}

+ 4 - 0
internal/processor/rule.go

@@ -200,6 +200,10 @@ func (p *RuleProcessor) GetAllRules() ([]string, error) {
 	return p.db.Keys()
 }
 
+func (p *RuleProcessor) GetAllRulesJson() (map[string]string, error) {
+	return p.db.All()
+}
+
 func (p *RuleProcessor) ExecDrop(name string) (string, error) {
 	result := fmt.Sprintf("Rule %s is dropped.", name)
 	var ruleJson string

+ 33 - 1
internal/processor/rule_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -154,3 +154,35 @@ func TestRuleActionParse_Apply(t *testing.T) {
 	}
 
 }
+
+func TestAllRules(t *testing.T) {
+	expected := map[string]string{
+		"rule1": "{\"id\": \"rule1\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{  \"log\": {}}]}",
+		"rule2": "{\"id\": \"rule2\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{  \"log\": {}}]}",
+		"rule3": "{\"id\": \"rule3\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{  \"log\": {}}]}",
+	}
+	sp := NewStreamProcessor()
+	defer sp.db.Clean()
+	sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
+	p := NewRuleProcessor()
+	p.db.Clean()
+	defer p.db.Clean()
+
+	for k, v := range expected {
+		_, err := p.ExecCreate(k, v)
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		defer p.ExecDrop(k)
+	}
+
+	all, err := p.GetAllRulesJson()
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	if !reflect.DeepEqual(all, expected) {
+		t.Errorf("Expect\t %v\nBut got\t%v", expected, all)
+	}
+}

+ 91 - 0
internal/processor/ruleset.go

@@ -0,0 +1,91 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package processor
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"io"
+)
+
+type RulesetProcessor struct {
+	r *RuleProcessor
+	s *StreamProcessor
+}
+
+type ruleset struct {
+	Streams map[string]string `json:"streams"`
+	Tables  map[string]string `json:"tables"`
+	Rules   map[string]string `json:"rules"`
+}
+
+func NewRulesetProcessor(r *RuleProcessor, s *StreamProcessor) *RulesetProcessor {
+	return &RulesetProcessor{
+		r: r,
+		s: s,
+	}
+}
+
+func (rs *RulesetProcessor) Export() (io.Reader, error) {
+	var all ruleset
+	allStreams, err := rs.s.GetAll()
+	if err != nil {
+		return nil, fmt.Errorf("fail to get all streams: %v", err)
+	}
+	all.Streams = allStreams["streams"]
+	all.Tables = allStreams["tables"]
+	rules, err := rs.r.GetAllRulesJson()
+	if err != nil {
+		return nil, fmt.Errorf("fail to get all rules: %v", err)
+	}
+	all.Rules = rules
+	jsonBytes, err := json.Marshal(all)
+	if err != nil {
+		return nil, err
+	}
+	return bytes.NewBuffer(jsonBytes), nil
+}
+
+func (rs *RulesetProcessor) Import(content []byte, overwrite bool) error {
+	all := &ruleset{}
+	err := json.Unmarshal(content, all)
+	if err != nil {
+		return fmt.Errorf("invalid import file: %v", err)
+	}
+	// restore streams
+	for k, v := range all.Streams {
+		_, e := rs.s.ExecStreamSql(v)
+		if e != nil {
+			conf.Log.Errorf("Fail to import stream %s(%s) with error: %v", k, v, e)
+		}
+	}
+	// restore tables
+	for k, v := range all.Tables {
+		_, e := rs.s.ExecStreamSql(v)
+		if e != nil {
+			conf.Log.Errorf("Fail to import table %s(%s) with error: %v", k, v, e)
+		}
+	}
+	// restore rules
+	for k, v := range all.Rules {
+		_, e := rs.r.ExecCreate("", v)
+		if e != nil {
+			conf.Log.Errorf("Fail to import rule %s(%s) with error: %v", k, v, e)
+		}
+	}
+	return nil
+}

+ 76 - 0
internal/processor/ruleset_test.go

@@ -0,0 +1,76 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package processor
+
+import (
+	"github.com/lf-edge/ekuiper/pkg/ast"
+	"io"
+	"reflect"
+	"strings"
+	"testing"
+)
+
+func TestIO(t *testing.T) {
+	expected := `{"streams":{"demo":"CREATE STREAM demo () WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"},"tables":{},"rules":{"rule1":"{\"id\":\"rule1\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{\"log\": {}}]}","rule2":"{\"id\": \"rule2\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{  \"log\": {}}]}"}}`
+	expectedStreams := []string{"demo"}
+	expectedRules := []string{"rule1", "rule2"}
+	sp := NewStreamProcessor()
+	defer sp.db.Clean()
+	rp := NewRuleProcessor()
+	defer rp.db.Clean()
+	rsp := NewRulesetProcessor(rp, sp)
+
+	err := rsp.Import([]byte(expected), true)
+	if err != nil {
+		t.Errorf("fail to import ruleset: %v", err)
+		return
+	}
+
+	streams, err := sp.execShow(ast.TypeStream)
+	if err != nil {
+		t.Errorf("fail to get all streams: %v", err)
+		return
+	}
+	if !reflect.DeepEqual(streams, expectedStreams) {
+		t.Errorf("After import, expect streams %v, but got %v", expectedStreams, streams)
+		return
+	}
+
+	rules, err := rp.GetAllRules()
+	if err != nil {
+		t.Errorf("fail to get all rules: %v", err)
+		return
+	}
+	if !reflect.DeepEqual(rules, expectedRules) {
+		t.Errorf("After import, expect rules %v, but got %v", expectedRules, rules)
+		return
+	}
+
+	exp, err := rsp.Export()
+	if err != nil {
+		t.Errorf("fail to export ruleset: %v", err)
+		return
+	}
+	buf := new(strings.Builder)
+	_, err = io.Copy(buf, exp)
+	if err != nil {
+		t.Errorf("fail to convert exported ruleset: %v", err)
+		return
+	}
+	actual := buf.String()
+	if actual != expected {
+		t.Errorf("Expect\t\n %v but got\t\n %v", expected, actual)
+	}
+}

+ 27 - 0
internal/processor/stream.go

@@ -398,3 +398,30 @@ func printFieldType(ft ast.FieldType) (result string) {
 	}
 	return
 }
+
+// GetAll return all streams and tables defined to export.
+func (p *StreamProcessor) GetAll() (result map[string]map[string]string, e error) {
+	defs, err := p.db.All()
+	if err != nil {
+		e = err
+		return
+	}
+	var (
+		vs = &xsql.StreamInfo{}
+	)
+	result = map[string]map[string]string{
+		"streams": make(map[string]string),
+		"tables":  make(map[string]string),
+	}
+	for k, v := range defs {
+		if err := json.Unmarshal([]byte(v), vs); err == nil {
+			switch vs.StreamType {
+			case ast.TypeStream:
+				result["streams"][k] = vs.Statement
+			case ast.TypeTable:
+				result["tables"][k] = vs.Statement
+			}
+		}
+	}
+	return
+}

+ 32 - 0
internal/processor/stream_test.go

@@ -235,3 +235,35 @@ func TestTableList(t *testing.T) {
 		return
 	}
 }
+
+func TestAll(t *testing.T) {
+	expected := map[string]map[string]string{
+		"streams": {
+			"demo":  "create stream demo () WITH (FORMAT=\"JSON\", DATASOURCE=\"demo\", SHARED=\"TRUE\")",
+			"demo1": "create stream demo1 () WITH (FORMAT=\"JSON\", DATASOURCE=\"demo\")",
+			"demo2": "create stream demo2 () WITH (FORMAT=\"JSON\", DATASOURCE=\"demo\", SHARED=\"TRUE\")",
+			"demo3": "create stream demo3 () WITH (FORMAT=\"JSON\", DATASOURCE=\"demo\", SHARED=\"TRUE\")",
+		},
+		"tables": {
+			"tt1": `CREATE TABLE tt1 () WITH (DATASOURCE="users", FORMAT="JSON", KIND="scan")`,
+			"tt3": `CREATE TABLE tt3 () WITH (DATASOURCE="users", TYPE="memory", FORMAT="JSON", KEY="id", KIND="lookup")`,
+		},
+	}
+	p := NewStreamProcessor()
+	p.db.Clean()
+	defer p.db.Clean()
+	for st, m := range expected {
+		for k, v := range m {
+			p.ExecStmt(v)
+			defer p.ExecStmt("Drop " + st + " " + k)
+		}
+	}
+	all, err := p.GetAll()
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	if !reflect.DeepEqual(all, expected) {
+		t.Errorf("Expect\t %v\nBut got\t%v", expected, all)
+	}
+}

+ 2 - 1
pkg/kv/kv.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@ type KeyValue interface {
 	//Must return *common.Error with NOT_FOUND error
 	Delete(key string) error
 	Keys() (keys []string, err error)
+	All() (all map[string]string, err error)
 	Clean() error
 	Drop() error
 }