|
@@ -9,6 +9,7 @@ import (
|
|
"github.com/emqx/kuiper/xstream/api"
|
|
"github.com/emqx/kuiper/xstream/api"
|
|
"github.com/emqx/kuiper/xstream/nodes"
|
|
"github.com/emqx/kuiper/xstream/nodes"
|
|
"github.com/emqx/kuiper/xstream/test"
|
|
"github.com/emqx/kuiper/xstream/test"
|
|
|
|
+ "os"
|
|
"path"
|
|
"path"
|
|
"reflect"
|
|
"reflect"
|
|
"strings"
|
|
"strings"
|
|
@@ -28,6 +29,19 @@ func getDbDir() string {
|
|
return dbDir
|
|
return dbDir
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func cleanStateData() {
|
|
|
|
+ dbDir, err := common.GetDataLoc()
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Panic(err)
|
|
|
|
+ }
|
|
|
|
+ c := path.Join(dbDir, "checkpoints")
|
|
|
|
+ err = os.RemoveAll(c)
|
|
|
|
+ log.Errorf("%s", err)
|
|
|
|
+ s := path.Join(dbDir, "sink")
|
|
|
|
+ err = os.RemoveAll(s)
|
|
|
|
+ log.Errorf("%s", err)
|
|
|
|
+}
|
|
|
|
+
|
|
func TestStreamCreateProcessor(t *testing.T) {
|
|
func TestStreamCreateProcessor(t *testing.T) {
|
|
var tests = []struct {
|
|
var tests = []struct {
|
|
s string
|
|
s string
|
|
@@ -957,6 +971,7 @@ func TestSingleSQL(t *testing.T) {
|
|
}
|
|
}
|
|
tp.Cancel()
|
|
tp.Cancel()
|
|
}
|
|
}
|
|
|
|
+ cleanStateData()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1093,6 +1108,7 @@ func TestSingleSQLTemplate(t *testing.T) {
|
|
}
|
|
}
|
|
tp.Cancel()
|
|
tp.Cancel()
|
|
}
|
|
}
|
|
|
|
+ cleanStateData()
|
|
}
|
|
}
|
|
|
|
|
|
func TestNoneSingleSQLTemplate(t *testing.T) {
|
|
func TestNoneSingleSQLTemplate(t *testing.T) {
|
|
@@ -1202,6 +1218,7 @@ func TestNoneSingleSQLTemplate(t *testing.T) {
|
|
}
|
|
}
|
|
tp.Cancel()
|
|
tp.Cancel()
|
|
}
|
|
}
|
|
|
|
+ cleanStateData()
|
|
}
|
|
}
|
|
|
|
|
|
func getMockSourceL(name string, done <-chan int, size int) *nodes.SourceNode {
|
|
func getMockSourceL(name string, done <-chan int, size int) *nodes.SourceNode {
|
|
@@ -1571,6 +1588,7 @@ func TestSingleSQLError(t *testing.T) {
|
|
}
|
|
}
|
|
tp.Cancel()
|
|
tp.Cancel()
|
|
}
|
|
}
|
|
|
|
+ cleanStateData()
|
|
}
|
|
}
|
|
|
|
|
|
func TestWindow(t *testing.T) {
|
|
func TestWindow(t *testing.T) {
|
|
@@ -2161,6 +2179,7 @@ func TestWindow(t *testing.T) {
|
|
}
|
|
}
|
|
tp.Cancel()
|
|
tp.Cancel()
|
|
}
|
|
}
|
|
|
|
+ cleanStateData()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -2495,6 +2514,7 @@ func TestWindowError(t *testing.T) {
|
|
}
|
|
}
|
|
tp.Cancel()
|
|
tp.Cancel()
|
|
}
|
|
}
|
|
|
|
+ cleanStateData()
|
|
}
|
|
}
|
|
|
|
|
|
func createEventStreams(t *testing.T) {
|
|
func createEventStreams(t *testing.T) {
|
|
@@ -3375,7 +3395,9 @@ func TestEventWindow(t *testing.T) {
|
|
}
|
|
}
|
|
tp.Cancel()
|
|
tp.Cancel()
|
|
}
|
|
}
|
|
|
|
+ cleanStateData()
|
|
}
|
|
}
|
|
|
|
+
|
|
}
|
|
}
|
|
|
|
|
|
func getMetric(tp *xstream.TopologyNew, name string) int {
|
|
func getMetric(tp *xstream.TopologyNew, name string) int {
|