Browse Source

feat: let `startRule` rerun the rule (#1967)

* support rerun rule

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

---------

Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao 1 year ago
parent
commit
54a03591ee
2 changed files with 59 additions and 6 deletions
  1. 53 0
      internal/server/rest_test.go
  2. 6 6
      internal/server/rule_manager.go

+ 53 - 0
internal/server/rest_test.go

@@ -30,6 +30,7 @@ import (
 	"github.com/stretchr/testify/suite"
 
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/pkg/model"
 	"github.com/lf-edge/ekuiper/internal/processor"
 	"github.com/lf-edge/ekuiper/internal/testx"
 	"github.com/lf-edge/ekuiper/internal/topo/rule"
@@ -393,3 +394,55 @@ func (suite *RestTestSuite) Test_fileUpload() {
 func TestRestTestSuite(t *testing.T) {
 	suite.Run(t, new(RestTestSuite))
 }
+
+func (suite *ServerTestSuite) TestStartRuleAfterSchemaChange() {
+	sql := `Create Stream test (a bigint) WITH (DATASOURCE="../internal/server/rpc_test_data/test.json", FORMAT="JSON", type="file");`
+	var reply string
+	err := suite.s.Stream(sql, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "Stream test is created.\n", reply)
+
+	reply = ""
+	rule := `{
+			  "sql": "SELECT a from test;",
+			  "actions": [{
+				"file": {
+				  "path": "../internal/server/rpc_test_data/data/result.txt",
+				  "interval": 5000,
+				  "fileType": "lines",
+				  "format": "json"
+				}
+			  }]
+	}`
+	ruleId := "myRule"
+	args := &model.RPCArgDesc{Name: ruleId, Json: rule}
+	err = suite.s.CreateRule(args, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "Rule myRule was created successfully, please use 'bin/kuiper getstatus rule myRule' command to get rule status.", reply)
+
+	reply = ""
+	err = suite.s.GetStatusRule(ruleId, &reply)
+	assert.Nil(suite.T(), err)
+
+	reply = ""
+	err = suite.s.StopRule(ruleId, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "Rule myRule was stopped.", reply)
+
+	reply = ""
+	sql = `drop stream test`
+	err = suite.s.Stream(sql, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "Stream test is dropped.\n", reply)
+
+	reply = ""
+	sql = `Create Stream test (b bigint) WITH (DATASOURCE="../internal/server/rpc_test_data/test.json", FORMAT="JSON", type="file");`
+	err = suite.s.Stream(sql, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "Stream test is created.\n", reply)
+
+	reply = ""
+	err = suite.s.StartRule(ruleId, &reply)
+	assert.Error(suite.T(), err)
+	assert.Equal(suite.T(), err.Error(), "unknown field a")
+}

+ 6 - 6
internal/server/rule_manager.go

@@ -180,16 +180,16 @@ func deleteRule(name string) (result string) {
 }
 
 func startRule(name string) error {
+	return reRunRule(name)
+}
+
+// reRunRule rerun the rule from optimize to Open the operator in order to refresh the schema
+func reRunRule(name string) error {
 	rs, ok := registry.Load(name)
 	if !ok {
 		return fmt.Errorf("Rule %s is not found in registry, please check if it is created", name)
 	} else {
-		err := rs.Start()
-		if err != nil {
-			return err
-		}
-		err = ruleProcessor.ExecReplaceRuleState(rs.RuleId, true)
-		return err
+		return rs.UpdateTopo(rs.Rule)
 	}
 }