|
@@ -12,7 +12,7 @@
|
|
|
// See the License for the specific language governing permissions and
|
|
|
// limitations under the License.
|
|
|
|
|
|
-package topotest
|
|
|
+package plugin
|
|
|
|
|
|
import (
|
|
|
"bufio"
|
|
@@ -24,6 +24,7 @@ import (
|
|
|
"github.com/lf-edge/ekuiper/internal/conf"
|
|
|
"github.com/lf-edge/ekuiper/internal/plugin/native"
|
|
|
"github.com/lf-edge/ekuiper/internal/topo/planner"
|
|
|
+ "github.com/lf-edge/ekuiper/internal/topo/topotest"
|
|
|
"github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
|
|
|
"github.com/lf-edge/ekuiper/pkg/api"
|
|
|
"os"
|
|
@@ -57,7 +58,7 @@ func TestExtensions(t *testing.T) {
|
|
|
log := conf.Log
|
|
|
//Reset
|
|
|
streamList := []string{"ext", "ext2"}
|
|
|
- HandleStream(false, streamList, t)
|
|
|
+ topotest.HandleStream(false, streamList, t)
|
|
|
os.Remove(CACHE_FILE)
|
|
|
os.Create(CACHE_FILE)
|
|
|
var tests = []struct {
|
|
@@ -76,12 +77,12 @@ func TestExtensions(t *testing.T) {
|
|
|
maxLength: 2,
|
|
|
},
|
|
|
}
|
|
|
- HandleStream(true, streamList, t)
|
|
|
+ topotest.HandleStream(true, streamList, t)
|
|
|
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
|
|
|
for i, tt := range tests {
|
|
|
mockclock.ResetClock(1541152486000)
|
|
|
// Create rule
|
|
|
- rs, err := CreateRule(tt.name, tt.rj)
|
|
|
+ rs, err := topotest.CreateRule(tt.name, tt.rj)
|
|
|
if err != nil {
|
|
|
t.Errorf("failed to create rule: %s.", err)
|
|
|
continue
|
|
@@ -175,9 +176,9 @@ func getResults() []string {
|
|
|
func TestFuncState(t *testing.T) {
|
|
|
//Reset
|
|
|
streamList := []string{"text"}
|
|
|
- HandleStream(false, streamList, t)
|
|
|
+ topotest.HandleStream(false, streamList, t)
|
|
|
//Data setup
|
|
|
- var tests = []RuleTest{
|
|
|
+ var tests = []topotest.RuleTest{
|
|
|
{
|
|
|
Name: `TestFuncStateRule1`,
|
|
|
Sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
|
|
@@ -223,8 +224,8 @@ func TestFuncState(t *testing.T) {
|
|
|
},
|
|
|
},
|
|
|
}
|
|
|
- HandleStream(true, streamList, t)
|
|
|
- DoRuleTest(t, tests, 0, &api.RuleOption{
|
|
|
+ topotest.HandleStream(true, streamList, t)
|
|
|
+ topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
|
|
|
BufferLength: 100,
|
|
|
SendError: true,
|
|
|
}, 0)
|
|
@@ -232,10 +233,10 @@ func TestFuncState(t *testing.T) {
|
|
|
|
|
|
func TestFuncStateCheckpoint(t *testing.T) {
|
|
|
streamList := []string{"text"}
|
|
|
- HandleStream(false, streamList, t)
|
|
|
- var tests = []RuleCheckpointTest{
|
|
|
+ topotest.HandleStream(false, streamList, t)
|
|
|
+ var tests = []topotest.RuleCheckpointTest{
|
|
|
{
|
|
|
- RuleTest: RuleTest{
|
|
|
+ RuleTest: topotest.RuleTest{
|
|
|
Name: `TestFuncStateCheckpointRule1`,
|
|
|
Sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
|
|
|
R: [][]map[string]interface{}{
|
|
@@ -300,8 +301,8 @@ func TestFuncStateCheckpoint(t *testing.T) {
|
|
|
},
|
|
|
},
|
|
|
}
|
|
|
- HandleStream(true, streamList, t)
|
|
|
- DoCheckpointRuleTest(t, tests, 0, &api.RuleOption{
|
|
|
+ topotest.HandleStream(true, streamList, t)
|
|
|
+ topotest.DoCheckpointRuleTest(t, tests, 0, &api.RuleOption{
|
|
|
BufferLength: 100,
|
|
|
Qos: api.AtLeastOnce,
|
|
|
CheckpointInterval: 2000,
|