|
@@ -113,6 +113,10 @@ func TestCreate(t *testing.T) {
|
|
}
|
|
}
|
|
|
|
|
|
func TestUpdate(t *testing.T) {
|
|
func TestUpdate(t *testing.T) {
|
|
|
|
+ ignoreSignal = true
|
|
|
|
+ defer func() {
|
|
|
|
+ ignoreSignal = false
|
|
|
|
+ }()
|
|
sp := processor.NewStreamProcessor()
|
|
sp := processor.NewStreamProcessor()
|
|
sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
|
|
sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
|
|
defer sp.ExecStmt(`DROP STREAM demo`)
|
|
defer sp.ExecStmt(`DROP STREAM demo`)
|
|
@@ -184,7 +188,7 @@ func TestUpdate(t *testing.T) {
|
|
}
|
|
}
|
|
for i, tt := range tests {
|
|
for i, tt := range tests {
|
|
rs, err := NewRuleState(&api.Rule{
|
|
rs, err := NewRuleState(&api.Rule{
|
|
- Triggered: false,
|
|
|
|
|
|
+ Triggered: true,
|
|
Id: "test",
|
|
Id: "test",
|
|
Sql: "SELECT ts FROM demo",
|
|
Sql: "SELECT ts FROM demo",
|
|
Actions: []map[string]interface{}{
|
|
Actions: []map[string]interface{}{
|
|
@@ -197,7 +201,10 @@ func TestUpdate(t *testing.T) {
|
|
require.NoError(t, err)
|
|
require.NoError(t, err)
|
|
err = rs.Start()
|
|
err = rs.Start()
|
|
require.NoError(t, err)
|
|
require.NoError(t, err)
|
|
|
|
+ time.Sleep(5 * time.Millisecond)
|
|
|
|
+ require.Equal(t, 1, rs.triggered, fmt.Sprintf("case %v failed", i))
|
|
err = rs.UpdateTopo(tt.r)
|
|
err = rs.UpdateTopo(tt.r)
|
|
|
|
+ time.Sleep(5 * time.Millisecond)
|
|
require.Equal(t, tt.e, err, fmt.Sprintf("case %v failed", i))
|
|
require.Equal(t, tt.e, err, fmt.Sprintf("case %v failed", i))
|
|
require.Equal(t, tt.triggered, rs.triggered, fmt.Sprintf("case %v failed", i))
|
|
require.Equal(t, tt.triggered, rs.triggered, fmt.Sprintf("case %v failed", i))
|
|
rs.Close()
|
|
rs.Close()
|