|
@@ -5,6 +5,7 @@ import (
|
|
"fmt"
|
|
"fmt"
|
|
"github.com/emqx/kuiper/common"
|
|
"github.com/emqx/kuiper/common"
|
|
"github.com/emqx/kuiper/xsql"
|
|
"github.com/emqx/kuiper/xsql"
|
|
|
|
+ "github.com/emqx/kuiper/xstream"
|
|
"github.com/emqx/kuiper/xstream/api"
|
|
"github.com/emqx/kuiper/xstream/api"
|
|
"github.com/emqx/kuiper/xstream/nodes"
|
|
"github.com/emqx/kuiper/xstream/nodes"
|
|
"github.com/emqx/kuiper/xstream/test"
|
|
"github.com/emqx/kuiper/xstream/test"
|
|
@@ -89,24 +90,25 @@ func TestStreamCreateProcessor(t *testing.T) {
|
|
|
|
|
|
streamDB := path.Join(getDbDir(), "streamTest")
|
|
streamDB := path.Join(getDbDir(), "streamTest")
|
|
for i, tt := range tests {
|
|
for i, tt := range tests {
|
|
- results, err := NewStreamProcessor(tt.s, streamDB).Exec()
|
|
|
|
|
|
+ results, err := NewStreamProcessor(streamDB).ExecStmt(tt.s)
|
|
if !reflect.DeepEqual(tt.err, errstring(err)) {
|
|
if !reflect.DeepEqual(tt.err, errstring(err)) {
|
|
t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
|
|
t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
|
|
} else if tt.err == "" {
|
|
} else if tt.err == "" {
|
|
if !reflect.DeepEqual(tt.r, results) {
|
|
if !reflect.DeepEqual(tt.r, results) {
|
|
- t.Errorf("%d. %q\n\nstmt mismatch:\nexp=%s\ngot=%#v\n\n", i, tt.s,tt.r, results)
|
|
|
|
|
|
+ t.Errorf("%d. %q\n\nstmt mismatch:\nexp=%s\ngot=%#v\n\n", i, tt.s, tt.r, results)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
func createStreams(t *testing.T) {
|
|
func createStreams(t *testing.T) {
|
|
|
|
+ p := NewStreamProcessor(path.Join(DbDir, "stream"))
|
|
demo := `CREATE STREAM demo (
|
|
demo := `CREATE STREAM demo (
|
|
color STRING,
|
|
color STRING,
|
|
size BIGINT,
|
|
size BIGINT,
|
|
ts BIGINT
|
|
ts BIGINT
|
|
) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
|
|
) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
|
|
- _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
|
|
|
|
|
|
+ _, err := p.ExecStmt(demo)
|
|
if err != nil {
|
|
if err != nil {
|
|
t.Log(err)
|
|
t.Log(err)
|
|
}
|
|
}
|
|
@@ -115,7 +117,7 @@ func createStreams(t *testing.T) {
|
|
hum BIGINT,
|
|
hum BIGINT,
|
|
ts BIGINT
|
|
ts BIGINT
|
|
) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
|
|
) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
|
|
- _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
|
|
|
|
|
|
+ _, err = p.ExecStmt(demo1)
|
|
if err != nil {
|
|
if err != nil {
|
|
t.Log(err)
|
|
t.Log(err)
|
|
}
|
|
}
|
|
@@ -124,31 +126,32 @@ func createStreams(t *testing.T) {
|
|
hum BIGINT,
|
|
hum BIGINT,
|
|
ts BIGINT
|
|
ts BIGINT
|
|
) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
|
|
) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
|
|
- _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
|
|
|
|
|
|
+ _, err = p.ExecStmt(sessionDemo)
|
|
if err != nil {
|
|
if err != nil {
|
|
t.Log(err)
|
|
t.Log(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
func dropStreams(t *testing.T) {
|
|
func dropStreams(t *testing.T) {
|
|
|
|
+ p := NewStreamProcessor(path.Join(DbDir, "stream"))
|
|
demo := `DROP STREAM demo`
|
|
demo := `DROP STREAM demo`
|
|
- _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
|
|
|
|
|
|
+ _, err := p.ExecStmt(demo)
|
|
if err != nil {
|
|
if err != nil {
|
|
t.Log(err)
|
|
t.Log(err)
|
|
}
|
|
}
|
|
demo1 := `DROP STREAM demo1`
|
|
demo1 := `DROP STREAM demo1`
|
|
- _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
|
|
|
|
|
|
+ _, err = p.ExecStmt(demo1)
|
|
if err != nil {
|
|
if err != nil {
|
|
t.Log(err)
|
|
t.Log(err)
|
|
}
|
|
}
|
|
sessionDemo := `DROP STREAM sessionDemo`
|
|
sessionDemo := `DROP STREAM sessionDemo`
|
|
- _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
|
|
|
|
|
|
+ _, err = p.ExecStmt(sessionDemo)
|
|
if err != nil {
|
|
if err != nil {
|
|
t.Log(err)
|
|
t.Log(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-func getMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNode {
|
|
|
|
|
|
+func getMockSource(name string, done <-chan int, size int) *nodes.SourceNode {
|
|
var data []*xsql.Tuple
|
|
var data []*xsql.Tuple
|
|
switch name {
|
|
switch name {
|
|
case "demo":
|
|
case "demo":
|
|
@@ -360,6 +363,7 @@ func TestSingleSQL(t *testing.T) {
|
|
name string
|
|
name string
|
|
sql string
|
|
sql string
|
|
r [][]map[string]interface{}
|
|
r [][]map[string]interface{}
|
|
|
|
+ s string
|
|
m map[string]interface{}
|
|
m map[string]interface{}
|
|
}{
|
|
}{
|
|
{
|
|
{
|
|
@@ -403,14 +407,15 @@ func TestSingleSQL(t *testing.T) {
|
|
"op_project_0_records_in_total": int64(5),
|
|
"op_project_0_records_in_total": int64(5),
|
|
"op_project_0_records_out_total": int64(5),
|
|
"op_project_0_records_out_total": int64(5),
|
|
|
|
|
|
- "sink_MockSink_0_exceptions_total": int64(0),
|
|
|
|
- "sink_MockSink_0_records_in_total": int64(5),
|
|
|
|
- "sink_MockSink_0_records_out_total": int64(5),
|
|
|
|
|
|
+ "sink_mockSink_0_exceptions_total": int64(0),
|
|
|
|
+ "sink_mockSink_0_records_in_total": int64(5),
|
|
|
|
+ "sink_mockSink_0_records_out_total": int64(5),
|
|
|
|
|
|
"source_demo_0_exceptions_total": int64(0),
|
|
"source_demo_0_exceptions_total": int64(0),
|
|
"source_demo_0_records_in_total": int64(5),
|
|
"source_demo_0_records_in_total": int64(5),
|
|
"source_demo_0_records_out_total": int64(5),
|
|
"source_demo_0_records_out_total": int64(5),
|
|
},
|
|
},
|
|
|
|
+ s: "sink_mockSink_0_records_out_total",
|
|
}, {
|
|
}, {
|
|
name: `rule2`,
|
|
name: `rule2`,
|
|
sql: `SELECT color, ts FROM demo where size > 3`,
|
|
sql: `SELECT color, ts FROM demo where size > 3`,
|
|
@@ -424,6 +429,7 @@ func TestSingleSQL(t *testing.T) {
|
|
"ts": float64(1541152488442),
|
|
"ts": float64(1541152488442),
|
|
}},
|
|
}},
|
|
},
|
|
},
|
|
|
|
+ s: "op_filter_0_records_in_total",
|
|
m: map[string]interface{}{
|
|
m: map[string]interface{}{
|
|
"op_preprocessor_demo_0_exceptions_total": int64(0),
|
|
"op_preprocessor_demo_0_exceptions_total": int64(0),
|
|
"op_preprocessor_demo_0_process_latency_ms": int64(0),
|
|
"op_preprocessor_demo_0_process_latency_ms": int64(0),
|
|
@@ -435,9 +441,9 @@ func TestSingleSQL(t *testing.T) {
|
|
"op_project_0_records_in_total": int64(2),
|
|
"op_project_0_records_in_total": int64(2),
|
|
"op_project_0_records_out_total": int64(2),
|
|
"op_project_0_records_out_total": int64(2),
|
|
|
|
|
|
- "sink_MockSink_0_exceptions_total": int64(0),
|
|
|
|
- "sink_MockSink_0_records_in_total": int64(2),
|
|
|
|
- "sink_MockSink_0_records_out_total": int64(2),
|
|
|
|
|
|
+ "sink_mockSink_0_exceptions_total": int64(0),
|
|
|
|
+ "sink_mockSink_0_records_in_total": int64(2),
|
|
|
|
+ "sink_mockSink_0_records_out_total": int64(2),
|
|
|
|
|
|
"source_demo_0_exceptions_total": int64(0),
|
|
"source_demo_0_exceptions_total": int64(0),
|
|
"source_demo_0_records_in_total": int64(5),
|
|
"source_demo_0_records_in_total": int64(5),
|
|
@@ -461,6 +467,7 @@ func TestSingleSQL(t *testing.T) {
|
|
"ts": float64(1541152488442),
|
|
"ts": float64(1541152488442),
|
|
}},
|
|
}},
|
|
},
|
|
},
|
|
|
|
+ s: "op_filter_0_records_in_total",
|
|
m: map[string]interface{}{
|
|
m: map[string]interface{}{
|
|
"op_preprocessor_demo_0_exceptions_total": int64(0),
|
|
"op_preprocessor_demo_0_exceptions_total": int64(0),
|
|
"op_preprocessor_demo_0_process_latency_ms": int64(0),
|
|
"op_preprocessor_demo_0_process_latency_ms": int64(0),
|
|
@@ -472,9 +479,9 @@ func TestSingleSQL(t *testing.T) {
|
|
"op_project_0_records_in_total": int64(2),
|
|
"op_project_0_records_in_total": int64(2),
|
|
"op_project_0_records_out_total": int64(2),
|
|
"op_project_0_records_out_total": int64(2),
|
|
|
|
|
|
- "sink_MockSink_0_exceptions_total": int64(0),
|
|
|
|
- "sink_MockSink_0_records_in_total": int64(2),
|
|
|
|
- "sink_MockSink_0_records_out_total": int64(2),
|
|
|
|
|
|
+ "sink_mockSink_0_exceptions_total": int64(0),
|
|
|
|
+ "sink_mockSink_0_records_in_total": int64(2),
|
|
|
|
+ "sink_mockSink_0_records_out_total": int64(2),
|
|
|
|
|
|
"source_demo_0_exceptions_total": int64(0),
|
|
"source_demo_0_exceptions_total": int64(0),
|
|
"source_demo_0_records_in_total": int64(5),
|
|
"source_demo_0_records_in_total": int64(5),
|
|
@@ -490,12 +497,15 @@ func TestSingleSQL(t *testing.T) {
|
|
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
|
|
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
|
|
createStreams(t)
|
|
createStreams(t)
|
|
defer dropStreams(t)
|
|
defer dropStreams(t)
|
|
- done := make(chan struct{})
|
|
|
|
//defer close(done)
|
|
//defer close(done)
|
|
for i, tt := range tests {
|
|
for i, tt := range tests {
|
|
|
|
+ test.ResetClock(1541152486000)
|
|
p := NewRuleProcessor(DbDir)
|
|
p := NewRuleProcessor(DbDir)
|
|
parser := xsql.NewParser(strings.NewReader(tt.sql))
|
|
parser := xsql.NewParser(strings.NewReader(tt.sql))
|
|
- var sources []*nodes.SourceNode
|
|
|
|
|
|
+ var (
|
|
|
|
+ sources []*nodes.SourceNode
|
|
|
|
+ syncs []chan int
|
|
|
|
+ )
|
|
if stmt, err := xsql.Language.Parse(parser); err != nil {
|
|
if stmt, err := xsql.Language.Parse(parser); err != nil {
|
|
t.Errorf("parse sql %s error: %s", tt.sql, err)
|
|
t.Errorf("parse sql %s error: %s", tt.sql, err)
|
|
} else {
|
|
} else {
|
|
@@ -504,41 +514,40 @@ func TestSingleSQL(t *testing.T) {
|
|
} else {
|
|
} else {
|
|
streams := xsql.GetStreams(selectStmt)
|
|
streams := xsql.GetStreams(selectStmt)
|
|
for _, stream := range streams {
|
|
for _, stream := range streams {
|
|
- source := getMockSource(stream, done, 5)
|
|
|
|
|
|
+ next := make(chan int)
|
|
|
|
+ syncs = append(syncs, next)
|
|
|
|
+ source := getMockSource(stream, next, 5)
|
|
sources = append(sources, source)
|
|
sources = append(sources, source)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options:map[string]interface{}{
|
|
|
|
|
|
+ tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: map[string]interface{}{
|
|
"bufferLength": float64(100),
|
|
"bufferLength": float64(100),
|
|
}}, sources)
|
|
}}, sources)
|
|
if err != nil {
|
|
if err != nil {
|
|
t.Error(err)
|
|
t.Error(err)
|
|
}
|
|
}
|
|
mockSink := test.NewMockSink()
|
|
mockSink := test.NewMockSink()
|
|
- sink := nodes.NewSinkNodeWithSink("MockSink", mockSink)
|
|
|
|
|
|
+ sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
|
|
tp.AddSink(inputs, sink)
|
|
tp.AddSink(inputs, sink)
|
|
- count := len(sources)
|
|
|
|
errCh := tp.Open()
|
|
errCh := tp.Open()
|
|
func() {
|
|
func() {
|
|
- for {
|
|
|
|
|
|
+ for i := 0; i < 5; i++ {
|
|
|
|
+ syncs[i%len(syncs)] <- i
|
|
select {
|
|
select {
|
|
case err = <-errCh:
|
|
case err = <-errCh:
|
|
t.Log(err)
|
|
t.Log(err)
|
|
tp.Cancel()
|
|
tp.Cancel()
|
|
return
|
|
return
|
|
- case <-done:
|
|
|
|
- count--
|
|
|
|
- log.Infof("%d sources remaining", count)
|
|
|
|
- if count <= 0 {
|
|
|
|
- log.Info("stream stopping")
|
|
|
|
- time.Sleep(1 * time.Second)
|
|
|
|
- tp.Cancel()
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
default:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ for retry := 100; retry > 0; retry-- {
|
|
|
|
+ if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
|
|
|
|
+ break
|
|
|
|
+ }
|
|
|
|
+ time.Sleep(time.Duration(retry) * time.Millisecond)
|
|
|
|
+ }
|
|
}()
|
|
}()
|
|
results := mockSink.GetResults()
|
|
results := mockSink.GetResults()
|
|
var maps [][]map[string]interface{}
|
|
var maps [][]map[string]interface{}
|
|
@@ -551,40 +560,18 @@ func TestSingleSQL(t *testing.T) {
|
|
}
|
|
}
|
|
maps = append(maps, mapRes)
|
|
maps = append(maps, mapRes)
|
|
}
|
|
}
|
|
- keys, values := tp.GetMetrics()
|
|
|
|
- //for i, k := range keys{
|
|
|
|
- // log.Printf("%s:%v", k, values[i])
|
|
|
|
- //}
|
|
|
|
- for k, v := range tt.m {
|
|
|
|
- var(
|
|
|
|
- index int
|
|
|
|
- key string
|
|
|
|
- matched bool
|
|
|
|
- )
|
|
|
|
- for index, key = range keys{
|
|
|
|
- if k == key {
|
|
|
|
- if values[index] == v{
|
|
|
|
- matched = true
|
|
|
|
- }
|
|
|
|
- break
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if matched{
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- //do not find
|
|
|
|
- if index < len(values){
|
|
|
|
- t.Errorf("%d. %q\n\nmetrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", i, tt.sql, k, v, v, values[index], values[index])
|
|
|
|
- }else{
|
|
|
|
- t.Errorf("%d. %q\n\nmetrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", i, tt.sql, k, v)
|
|
|
|
- }
|
|
|
|
- break
|
|
|
|
|
|
+ if !reflect.DeepEqual(tt.r, maps) {
|
|
|
|
+ t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
|
|
|
|
+ t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
|
|
}
|
|
}
|
|
|
|
+ tp.Cancel()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
func TestWindow(t *testing.T) {
|
|
func TestWindow(t *testing.T) {
|
|
- common.IsTesting = true
|
|
|
|
var tests = []struct {
|
|
var tests = []struct {
|
|
name string
|
|
name string
|
|
sql string
|
|
sql string
|
|
@@ -970,13 +957,14 @@ func TestWindow(t *testing.T) {
|
|
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
|
|
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
|
|
createStreams(t)
|
|
createStreams(t)
|
|
defer dropStreams(t)
|
|
defer dropStreams(t)
|
|
- done := make(chan struct{})
|
|
|
|
- defer close(done)
|
|
|
|
- common.ResetMockTicker()
|
|
|
|
for i, tt := range tests {
|
|
for i, tt := range tests {
|
|
|
|
+ test.ResetClock(1541152486000)
|
|
p := NewRuleProcessor(DbDir)
|
|
p := NewRuleProcessor(DbDir)
|
|
parser := xsql.NewParser(strings.NewReader(tt.sql))
|
|
parser := xsql.NewParser(strings.NewReader(tt.sql))
|
|
- var sources []*nodes.SourceNode
|
|
|
|
|
|
+ var (
|
|
|
|
+ sources []*nodes.SourceNode
|
|
|
|
+ syncs []chan int
|
|
|
|
+ )
|
|
if stmt, err := xsql.Language.Parse(parser); err != nil {
|
|
if stmt, err := xsql.Language.Parse(parser); err != nil {
|
|
t.Errorf("parse sql %s error: %s", tt.sql, err)
|
|
t.Errorf("parse sql %s error: %s", tt.sql, err)
|
|
} else {
|
|
} else {
|
|
@@ -985,7 +973,9 @@ func TestWindow(t *testing.T) {
|
|
} else {
|
|
} else {
|
|
streams := xsql.GetStreams(selectStmt)
|
|
streams := xsql.GetStreams(selectStmt)
|
|
for _, stream := range streams {
|
|
for _, stream := range streams {
|
|
- source := getMockSource(stream, done, tt.size)
|
|
|
|
|
|
+ next := make(chan int)
|
|
|
|
+ syncs = append(syncs, next)
|
|
|
|
+ source := getMockSource(stream, next, tt.size)
|
|
sources = append(sources, source)
|
|
sources = append(sources, source)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -997,25 +987,36 @@ func TestWindow(t *testing.T) {
|
|
mockSink := test.NewMockSink()
|
|
mockSink := test.NewMockSink()
|
|
sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
|
|
sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
|
|
tp.AddSink(inputs, sink)
|
|
tp.AddSink(inputs, sink)
|
|
- count := len(sources)
|
|
|
|
errCh := tp.Open()
|
|
errCh := tp.Open()
|
|
func() {
|
|
func() {
|
|
- for {
|
|
|
|
|
|
+ for i := 0; i < tt.size*len(syncs); i++ {
|
|
|
|
+ syncs[i%len(syncs)] <- i
|
|
|
|
+ for {
|
|
|
|
+ time.Sleep(1)
|
|
|
|
+ if getMetric(tp, "op_window_0_records_in_total") == (i + 1) {
|
|
|
|
+ break
|
|
|
|
+ }
|
|
|
|
+ }
|
|
select {
|
|
select {
|
|
case err = <-errCh:
|
|
case err = <-errCh:
|
|
t.Log(err)
|
|
t.Log(err)
|
|
tp.Cancel()
|
|
tp.Cancel()
|
|
return
|
|
return
|
|
- case <-done:
|
|
|
|
- count--
|
|
|
|
- log.Infof("%d sources remaining", count)
|
|
|
|
- if count <= 0 {
|
|
|
|
- log.Info("stream stopping")
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
default:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ retry := 100
|
|
|
|
+ for ; retry > 0; retry-- {
|
|
|
|
+ if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
|
|
|
|
+ break
|
|
|
|
+ }
|
|
|
|
+ t.Logf("wait to try another %d times", retry)
|
|
|
|
+ time.Sleep(time.Duration(retry) * time.Millisecond)
|
|
|
|
+ }
|
|
|
|
+ if retry == 0 {
|
|
|
|
+ err := compareMetrics(tp, tt.m, tt.sql)
|
|
|
|
+ t.Errorf("could not get correct metrics: %v", err)
|
|
|
|
+ }
|
|
}()
|
|
}()
|
|
results := mockSink.GetResults()
|
|
results := mockSink.GetResults()
|
|
var maps [][]map[string]interface{}
|
|
var maps [][]map[string]interface{}
|
|
@@ -1031,43 +1032,21 @@ func TestWindow(t *testing.T) {
|
|
if !reflect.DeepEqual(tt.r, maps) {
|
|
if !reflect.DeepEqual(tt.r, maps) {
|
|
t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
|
|
t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
|
|
}
|
|
}
|
|
- keys, values := tp.GetMetrics()
|
|
|
|
- for k, v := range tt.m {
|
|
|
|
- var(
|
|
|
|
- index int
|
|
|
|
- key string
|
|
|
|
- matched bool
|
|
|
|
- )
|
|
|
|
- for index, key = range keys{
|
|
|
|
- if k == key {
|
|
|
|
- if values[index] == v{
|
|
|
|
- matched = true
|
|
|
|
- }
|
|
|
|
- break
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if matched{
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- //do not find
|
|
|
|
- if index < len(values){
|
|
|
|
- t.Errorf("%d. %q\n\nmetrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", i, tt.sql, k, v, v, values[index], values[index])
|
|
|
|
- }else{
|
|
|
|
- t.Errorf("%d. %q\n\nmetrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", i, tt.sql, k, v)
|
|
|
|
- }
|
|
|
|
- break
|
|
|
|
|
|
+ if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
|
|
|
|
+ t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
|
|
}
|
|
}
|
|
tp.Cancel()
|
|
tp.Cancel()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
func createEventStreams(t *testing.T) {
|
|
func createEventStreams(t *testing.T) {
|
|
|
|
+ p := NewStreamProcessor(path.Join(DbDir, "stream"))
|
|
demo := `CREATE STREAM demoE (
|
|
demo := `CREATE STREAM demoE (
|
|
color STRING,
|
|
color STRING,
|
|
size BIGINT,
|
|
size BIGINT,
|
|
ts BIGINT
|
|
ts BIGINT
|
|
) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
|
|
) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
|
|
- _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
|
|
|
|
|
|
+ _, err := p.ExecStmt(demo)
|
|
if err != nil {
|
|
if err != nil {
|
|
t.Log(err)
|
|
t.Log(err)
|
|
}
|
|
}
|
|
@@ -1076,7 +1055,7 @@ func createEventStreams(t *testing.T) {
|
|
hum BIGINT,
|
|
hum BIGINT,
|
|
ts BIGINT
|
|
ts BIGINT
|
|
) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
|
|
) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
|
|
- _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
|
|
|
|
|
|
+ _, err = p.ExecStmt(demo1)
|
|
if err != nil {
|
|
if err != nil {
|
|
t.Log(err)
|
|
t.Log(err)
|
|
}
|
|
}
|
|
@@ -1085,31 +1064,32 @@ func createEventStreams(t *testing.T) {
|
|
hum BIGINT,
|
|
hum BIGINT,
|
|
ts BIGINT
|
|
ts BIGINT
|
|
) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
|
|
) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
|
|
- _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
|
|
|
|
|
|
+ _, err = p.ExecStmt(sessionDemo)
|
|
if err != nil {
|
|
if err != nil {
|
|
t.Log(err)
|
|
t.Log(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
func dropEventStreams(t *testing.T) {
|
|
func dropEventStreams(t *testing.T) {
|
|
|
|
+ p := NewStreamProcessor(path.Join(DbDir, "stream"))
|
|
demo := `DROP STREAM demoE`
|
|
demo := `DROP STREAM demoE`
|
|
- _, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
|
|
|
|
|
|
+ _, err := p.ExecStmt(demo)
|
|
if err != nil {
|
|
if err != nil {
|
|
t.Log(err)
|
|
t.Log(err)
|
|
}
|
|
}
|
|
demo1 := `DROP STREAM demo1E`
|
|
demo1 := `DROP STREAM demo1E`
|
|
- _, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
|
|
|
|
|
|
+ _, err = p.ExecStmt(demo1)
|
|
if err != nil {
|
|
if err != nil {
|
|
t.Log(err)
|
|
t.Log(err)
|
|
}
|
|
}
|
|
sessionDemo := `DROP STREAM sessionDemoE`
|
|
sessionDemo := `DROP STREAM sessionDemoE`
|
|
- _, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
|
|
|
|
|
|
+ _, err = p.ExecStmt(sessionDemo)
|
|
if err != nil {
|
|
if err != nil {
|
|
t.Log(err)
|
|
t.Log(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-func getEventMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNode {
|
|
|
|
|
|
+func getEventMockSource(name string, done <-chan int, size int) *nodes.SourceNode {
|
|
var data []*xsql.Tuple
|
|
var data []*xsql.Tuple
|
|
switch name {
|
|
switch name {
|
|
case "demoE":
|
|
case "demoE":
|
|
@@ -1344,7 +1324,6 @@ func getEventMockSource(name string, done chan<- struct{}, size int) *nodes.Sour
|
|
}
|
|
}
|
|
|
|
|
|
func TestEventWindow(t *testing.T) {
|
|
func TestEventWindow(t *testing.T) {
|
|
- common.IsTesting = true
|
|
|
|
var tests = []struct {
|
|
var tests = []struct {
|
|
name string
|
|
name string
|
|
sql string
|
|
sql string
|
|
@@ -1404,9 +1383,9 @@ func TestEventWindow(t *testing.T) {
|
|
"op_project_0_records_in_total": int64(5),
|
|
"op_project_0_records_in_total": int64(5),
|
|
"op_project_0_records_out_total": int64(5),
|
|
"op_project_0_records_out_total": int64(5),
|
|
|
|
|
|
- "sink_MockSink_0_exceptions_total": int64(0),
|
|
|
|
- "sink_MockSink_0_records_in_total": int64(5),
|
|
|
|
- "sink_MockSink_0_records_out_total": int64(5),
|
|
|
|
|
|
+ "sink_mockSink_0_exceptions_total": int64(0),
|
|
|
|
+ "sink_mockSink_0_records_in_total": int64(5),
|
|
|
|
+ "sink_mockSink_0_records_out_total": int64(5),
|
|
|
|
|
|
"source_demoE_0_exceptions_total": int64(0),
|
|
"source_demoE_0_exceptions_total": int64(0),
|
|
"source_demoE_0_records_in_total": int64(6),
|
|
"source_demoE_0_records_in_total": int64(6),
|
|
@@ -1442,9 +1421,9 @@ func TestEventWindow(t *testing.T) {
|
|
"op_project_0_records_in_total": int64(2),
|
|
"op_project_0_records_in_total": int64(2),
|
|
"op_project_0_records_out_total": int64(2),
|
|
"op_project_0_records_out_total": int64(2),
|
|
|
|
|
|
- "sink_MockSink_0_exceptions_total": int64(0),
|
|
|
|
- "sink_MockSink_0_records_in_total": int64(2),
|
|
|
|
- "sink_MockSink_0_records_out_total": int64(2),
|
|
|
|
|
|
+ "sink_mockSink_0_exceptions_total": int64(0),
|
|
|
|
+ "sink_mockSink_0_records_in_total": int64(2),
|
|
|
|
+ "sink_mockSink_0_records_out_total": int64(2),
|
|
|
|
|
|
"source_demoE_0_exceptions_total": int64(0),
|
|
"source_demoE_0_exceptions_total": int64(0),
|
|
"source_demoE_0_records_in_total": int64(6),
|
|
"source_demoE_0_records_in_total": int64(6),
|
|
@@ -1511,9 +1490,9 @@ func TestEventWindow(t *testing.T) {
|
|
"op_project_0_records_in_total": int64(5),
|
|
"op_project_0_records_in_total": int64(5),
|
|
"op_project_0_records_out_total": int64(5),
|
|
"op_project_0_records_out_total": int64(5),
|
|
|
|
|
|
- "sink_MockSink_0_exceptions_total": int64(0),
|
|
|
|
- "sink_MockSink_0_records_in_total": int64(5),
|
|
|
|
- "sink_MockSink_0_records_out_total": int64(5),
|
|
|
|
|
|
+ "sink_mockSink_0_exceptions_total": int64(0),
|
|
|
|
+ "sink_mockSink_0_records_in_total": int64(5),
|
|
|
|
+ "sink_mockSink_0_records_out_total": int64(5),
|
|
|
|
|
|
"source_demoE_0_exceptions_total": int64(0),
|
|
"source_demoE_0_exceptions_total": int64(0),
|
|
"source_demoE_0_records_in_total": int64(6),
|
|
"source_demoE_0_records_in_total": int64(6),
|
|
@@ -1567,9 +1546,9 @@ func TestEventWindow(t *testing.T) {
|
|
"op_project_0_records_in_total": int64(4),
|
|
"op_project_0_records_in_total": int64(4),
|
|
"op_project_0_records_out_total": int64(4),
|
|
"op_project_0_records_out_total": int64(4),
|
|
|
|
|
|
- "sink_MockSink_0_exceptions_total": int64(0),
|
|
|
|
- "sink_MockSink_0_records_in_total": int64(4),
|
|
|
|
- "sink_MockSink_0_records_out_total": int64(4),
|
|
|
|
|
|
+ "sink_mockSink_0_exceptions_total": int64(0),
|
|
|
|
+ "sink_mockSink_0_records_in_total": int64(4),
|
|
|
|
+ "sink_mockSink_0_records_out_total": int64(4),
|
|
|
|
|
|
"source_demoE_0_exceptions_total": int64(0),
|
|
"source_demoE_0_exceptions_total": int64(0),
|
|
"source_demoE_0_records_in_total": int64(6),
|
|
"source_demoE_0_records_in_total": int64(6),
|
|
@@ -1628,9 +1607,9 @@ func TestEventWindow(t *testing.T) {
|
|
"op_project_0_records_in_total": int64(4),
|
|
"op_project_0_records_in_total": int64(4),
|
|
"op_project_0_records_out_total": int64(4),
|
|
"op_project_0_records_out_total": int64(4),
|
|
|
|
|
|
- "sink_MockSink_0_exceptions_total": int64(0),
|
|
|
|
- "sink_MockSink_0_records_in_total": int64(4),
|
|
|
|
- "sink_MockSink_0_records_out_total": int64(4),
|
|
|
|
|
|
+ "sink_mockSink_0_exceptions_total": int64(0),
|
|
|
|
+ "sink_mockSink_0_records_in_total": int64(4),
|
|
|
|
+ "sink_mockSink_0_records_out_total": int64(4),
|
|
|
|
|
|
"source_sessionDemoE_0_exceptions_total": int64(0),
|
|
"source_sessionDemoE_0_exceptions_total": int64(0),
|
|
"source_sessionDemoE_0_records_in_total": int64(12),
|
|
"source_sessionDemoE_0_records_in_total": int64(12),
|
|
@@ -1679,9 +1658,9 @@ func TestEventWindow(t *testing.T) {
|
|
"op_project_0_records_in_total": int64(5),
|
|
"op_project_0_records_in_total": int64(5),
|
|
"op_project_0_records_out_total": int64(5),
|
|
"op_project_0_records_out_total": int64(5),
|
|
|
|
|
|
- "sink_MockSink_0_exceptions_total": int64(0),
|
|
|
|
- "sink_MockSink_0_records_in_total": int64(5),
|
|
|
|
- "sink_MockSink_0_records_out_total": int64(5),
|
|
|
|
|
|
+ "sink_mockSink_0_exceptions_total": int64(0),
|
|
|
|
+ "sink_mockSink_0_records_in_total": int64(5),
|
|
|
|
+ "sink_mockSink_0_records_out_total": int64(5),
|
|
|
|
|
|
"source_demoE_0_exceptions_total": int64(0),
|
|
"source_demoE_0_exceptions_total": int64(0),
|
|
"source_demoE_0_records_in_total": int64(6),
|
|
"source_demoE_0_records_in_total": int64(6),
|
|
@@ -1705,36 +1684,14 @@ func TestEventWindow(t *testing.T) {
|
|
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
|
|
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
|
|
createEventStreams(t)
|
|
createEventStreams(t)
|
|
defer dropEventStreams(t)
|
|
defer dropEventStreams(t)
|
|
- done := make(chan struct{})
|
|
|
|
- defer close(done)
|
|
|
|
- common.ResetMockTicker()
|
|
|
|
- //mock ticker
|
|
|
|
- realTicker := time.NewTicker(500 * time.Millisecond)
|
|
|
|
- tickerDone := make(chan bool)
|
|
|
|
- go func() {
|
|
|
|
- ticker := common.GetTicker(1000).(*common.MockTicker)
|
|
|
|
- timer := common.GetTimer(1000).(*common.MockTimer)
|
|
|
|
- for {
|
|
|
|
- select {
|
|
|
|
- case <-tickerDone:
|
|
|
|
- log.Infof("real ticker exiting...")
|
|
|
|
- return
|
|
|
|
- case t := <-realTicker.C:
|
|
|
|
- ts := common.TimeToUnixMilli(t)
|
|
|
|
- if ticker != nil {
|
|
|
|
- go ticker.DoTick(ts)
|
|
|
|
- }
|
|
|
|
- if timer != nil {
|
|
|
|
- go timer.DoTick(ts)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- }()
|
|
|
|
for i, tt := range tests {
|
|
for i, tt := range tests {
|
|
|
|
+ test.ResetClock(1541152486000)
|
|
p := NewRuleProcessor(DbDir)
|
|
p := NewRuleProcessor(DbDir)
|
|
parser := xsql.NewParser(strings.NewReader(tt.sql))
|
|
parser := xsql.NewParser(strings.NewReader(tt.sql))
|
|
- var sources []*nodes.SourceNode
|
|
|
|
|
|
+ var (
|
|
|
|
+ sources []*nodes.SourceNode
|
|
|
|
+ syncs []chan int
|
|
|
|
+ )
|
|
if stmt, err := xsql.Language.Parse(parser); err != nil {
|
|
if stmt, err := xsql.Language.Parse(parser); err != nil {
|
|
t.Errorf("parse sql %s error: %s", tt.sql, err)
|
|
t.Errorf("parse sql %s error: %s", tt.sql, err)
|
|
} else {
|
|
} else {
|
|
@@ -1743,7 +1700,9 @@ func TestEventWindow(t *testing.T) {
|
|
} else {
|
|
} else {
|
|
streams := xsql.GetStreams(selectStmt)
|
|
streams := xsql.GetStreams(selectStmt)
|
|
for _, stream := range streams {
|
|
for _, stream := range streams {
|
|
- source := getEventMockSource(stream, done, tt.size)
|
|
|
|
|
|
+ next := make(chan int)
|
|
|
|
+ syncs = append(syncs, next)
|
|
|
|
+ source := getEventMockSource(stream, next, tt.size)
|
|
sources = append(sources, source)
|
|
sources = append(sources, source)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1759,27 +1718,40 @@ func TestEventWindow(t *testing.T) {
|
|
t.Error(err)
|
|
t.Error(err)
|
|
}
|
|
}
|
|
mockSink := test.NewMockSink()
|
|
mockSink := test.NewMockSink()
|
|
- sink := nodes.NewSinkNodeWithSink("MockSink", mockSink)
|
|
|
|
|
|
+ sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
|
|
tp.AddSink(inputs, sink)
|
|
tp.AddSink(inputs, sink)
|
|
- count := len(sources)
|
|
|
|
errCh := tp.Open()
|
|
errCh := tp.Open()
|
|
func() {
|
|
func() {
|
|
- for {
|
|
|
|
|
|
+ for i := 0; i < tt.size*len(syncs); i++ {
|
|
|
|
+ syncs[i%len(syncs)] <- i
|
|
|
|
+ for {
|
|
|
|
+ time.Sleep(1)
|
|
|
|
+ if getMetric(tp, "op_window_0_records_in_total") == (i + 1) {
|
|
|
|
+ break
|
|
|
|
+ }
|
|
|
|
+ }
|
|
select {
|
|
select {
|
|
case err = <-errCh:
|
|
case err = <-errCh:
|
|
t.Log(err)
|
|
t.Log(err)
|
|
tp.Cancel()
|
|
tp.Cancel()
|
|
return
|
|
return
|
|
- case <-done:
|
|
|
|
- count--
|
|
|
|
- log.Infof("%d sources remaining", count)
|
|
|
|
- if count <= 0 {
|
|
|
|
- log.Info("stream stopping")
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
default:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ mockClock := test.GetMockClock()
|
|
|
|
+ mockClock.Add(1000 * time.Millisecond)
|
|
|
|
+ retry := 100
|
|
|
|
+ for ; retry > 0; retry-- {
|
|
|
|
+ if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
|
|
|
|
+ break
|
|
|
|
+ }
|
|
|
|
+ t.Logf("wait to try another %d times", retry)
|
|
|
|
+ time.Sleep(time.Duration(retry) * time.Millisecond)
|
|
|
|
+ }
|
|
|
|
+ if retry == 0 {
|
|
|
|
+ err := compareMetrics(tp, tt.m, tt.sql)
|
|
|
|
+ t.Errorf("could not get correct metrics: %v", err)
|
|
|
|
+ }
|
|
}()
|
|
}()
|
|
results := mockSink.GetResults()
|
|
results := mockSink.GetResults()
|
|
var maps [][]map[string]interface{}
|
|
var maps [][]map[string]interface{}
|
|
@@ -1795,40 +1767,62 @@ func TestEventWindow(t *testing.T) {
|
|
if !reflect.DeepEqual(tt.r, maps) {
|
|
if !reflect.DeepEqual(tt.r, maps) {
|
|
t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
|
|
t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
|
|
}
|
|
}
|
|
- keys, values := tp.GetMetrics()
|
|
|
|
- //for i, k := range keys{
|
|
|
|
- // log.Printf("%s:%v", k, values[i])
|
|
|
|
- //}
|
|
|
|
- for k, v := range tt.m {
|
|
|
|
- var(
|
|
|
|
- index int
|
|
|
|
- key string
|
|
|
|
- matched bool
|
|
|
|
- )
|
|
|
|
- for index, key = range keys{
|
|
|
|
- if k == key {
|
|
|
|
- if values[index] == v{
|
|
|
|
|
|
+ if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
|
|
|
|
+ t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
|
|
|
|
+ }
|
|
|
|
+ tp.Cancel()
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func getMetric(tp *xstream.TopologyNew, name string) int {
|
|
|
|
+ keys, values := tp.GetMetrics()
|
|
|
|
+ for index, key := range keys {
|
|
|
|
+ if key == name {
|
|
|
|
+ return int(values[index].(int64))
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ fmt.Println("can't find " + name)
|
|
|
|
+ return 0
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}, sql string) (err error) {
|
|
|
|
+ keys, values := tp.GetMetrics()
|
|
|
|
+ //for i, k := range keys{
|
|
|
|
+ // log.Printf("%s:%v", k, values[i])
|
|
|
|
+ //}
|
|
|
|
+ for k, v := range m {
|
|
|
|
+ var (
|
|
|
|
+ index int
|
|
|
|
+ key string
|
|
|
|
+ matched bool
|
|
|
|
+ )
|
|
|
|
+ for index, key = range keys {
|
|
|
|
+ if k == key {
|
|
|
|
+ if strings.HasSuffix(k, "process_latency_ms") {
|
|
|
|
+ if values[index].(int64) >= v.(int64) {
|
|
matched = true
|
|
matched = true
|
|
|
|
+ continue
|
|
|
|
+ } else {
|
|
|
|
+ break
|
|
}
|
|
}
|
|
- break
|
|
|
|
}
|
|
}
|
|
|
|
+ if values[index] == v {
|
|
|
|
+ matched = true
|
|
|
|
+ }
|
|
|
|
+ break
|
|
}
|
|
}
|
|
- if matched{
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- //do not find
|
|
|
|
- if index < len(values){
|
|
|
|
- t.Errorf("%d. %q\n\nmetrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", i, tt.sql, k, v, v, values[index], values[index])
|
|
|
|
- }else{
|
|
|
|
- t.Errorf("%d. %q\n\nmetrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", i, tt.sql, k, v)
|
|
|
|
- }
|
|
|
|
- break
|
|
|
|
}
|
|
}
|
|
- tp.Cancel()
|
|
|
|
|
|
+ if matched {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ //do not find
|
|
|
|
+ if index < len(values) {
|
|
|
|
+ return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", k, v, v, values[index], values[index])
|
|
|
|
+ } else {
|
|
|
|
+ return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", k, v)
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- realTicker.Stop()
|
|
|
|
- tickerDone <- true
|
|
|
|
- close(tickerDone)
|
|
|
|
|
|
+ return nil
|
|
}
|
|
}
|
|
|
|
|
|
func errstring(err error) string {
|
|
func errstring(err error) string {
|