Browse Source

feat(server): support concurrent rule processing

ngjaying 5 years atrás
parent
commit
5da0491786

+ 6 - 6
xstream/server/server/rpc.go

@@ -13,7 +13,7 @@ const QUERY_RULE_ID = "internal-xstream_query_rule"
 type Server int
 
 func (t *Server) CreateQuery(sql string, reply *string) error {
-	if _, ok := registry[QUERY_RULE_ID]; ok {
+	if _, ok := registry.Load(QUERY_RULE_ID); ok {
 		stopQuery()
 	}
 	tp, err := ruleProcessor.ExecQuery(QUERY_RULE_ID, sql)
@@ -21,7 +21,7 @@ func (t *Server) CreateQuery(sql string, reply *string) error {
 		return err
 	} else {
 		rs := &RuleState{Name: QUERY_RULE_ID, Topology: tp, Triggered: true}
-		registry[QUERY_RULE_ID] = rs
+		registry.Store(QUERY_RULE_ID, rs)
 		msg := fmt.Sprintf("Query was submit successfully.")
 		logger.Println(msg)
 		*reply = fmt.Sprintf(msg)
@@ -30,10 +30,10 @@ func (t *Server) CreateQuery(sql string, reply *string) error {
 }
 
 func stopQuery() {
-	if rs, ok := registry[QUERY_RULE_ID]; ok {
+	if rs, ok := registry.Load(QUERY_RULE_ID); ok {
 		logger.Printf("stop the query.")
 		(*rs.Topology).Cancel()
-		delete(registry, QUERY_RULE_ID)
+		registry.Delete(QUERY_RULE_ID)
 	}
 }
 
@@ -41,7 +41,7 @@ func stopQuery() {
  * qid is not currently used.
  */
 func (t *Server) GetQueryResult(qid string, reply *string) error {
-	if rs, ok := registry[QUERY_RULE_ID]; ok {
+	if rs, ok := registry.Load(QUERY_RULE_ID); ok {
 		c := (*rs.Topology).GetContext()
 		if c != nil && c.Err() != nil {
 			return c.Err()
@@ -163,7 +163,7 @@ func init() {
 	go func() {
 		for {
 			<-ticker.C
-			if _, ok := registry[QUERY_RULE_ID]; !ok {
+			if _, ok := registry.Load(QUERY_RULE_ID); !ok {
 				continue
 			}
 

+ 29 - 6
xstream/server/server/ruleManager.go

@@ -7,16 +7,39 @@ import (
 	"fmt"
 	"github.com/emqx/kuiper/xstream"
 	"github.com/emqx/kuiper/xstream/api"
+	"sync"
 )
 
-var registry RuleRegistry
+var registry *RuleRegistry
 
 type RuleState struct {
 	Name      string
 	Topology  *xstream.TopologyNew
 	Triggered bool
 }
-type RuleRegistry map[string]*RuleState
+type RuleRegistry struct {
+	sync.RWMutex
+	internal map[string]*RuleState
+}
+
+func (rr *RuleRegistry) Store(key string, value *RuleState) {
+	rr.Lock()
+	rr.internal[key] = value
+	rr.Unlock()
+}
+
+func (rr *RuleRegistry) Load(key string) (value *RuleState, ok bool) {
+	rr.RLock()
+	result, ok := rr.internal[key]
+	rr.RUnlock()
+	return result, ok
+}
+
+func (rr *RuleRegistry) Delete(key string) {
+	rr.Lock()
+	delete(rr.internal, key)
+	rr.Unlock()
+}
 
 func createRuleState(rule *api.Rule) (*RuleState, error) {
 	if tp, err := ruleProcessor.ExecInitRule(rule); err != nil {
@@ -27,7 +50,7 @@ func createRuleState(rule *api.Rule) (*RuleState, error) {
 			Topology:  tp,
 			Triggered: true,
 		}
-		registry[rule.Id] = rs
+		registry.Store(rule.Id, rs)
 		return rs, nil
 	}
 }
@@ -48,7 +71,7 @@ func doStartRule(rs *RuleState) error {
 
 func getRuleStatus(name string) (string, error) {
 	result := ""
-	if rs, ok := registry[name]; ok {
+	if rs, ok := registry.Load(name); ok {
 		if !rs.Triggered {
 			result = "Stopped: canceled manually."
 			return result, nil
@@ -94,7 +117,7 @@ func getRuleStatus(name string) (string, error) {
 
 func startRule(name string) error {
 	var rs *RuleState
-	rs, ok := registry[name]
+	rs, ok := registry.Load(name)
 	if !ok {
 		r, err := ruleProcessor.GetRuleByName(name)
 		if err != nil {
@@ -113,7 +136,7 @@ func startRule(name string) error {
 }
 
 func stopRule(name string) (result string) {
-	if rs, ok := registry[name]; ok {
+	if rs, ok := registry.Load(name); ok {
 		(*rs.Topology).Cancel()
 		rs.Triggered = false
 		result = fmt.Sprintf("Rule %s was stopped.", name)

+ 1 - 1
xstream/server/server/server.go

@@ -32,7 +32,7 @@ func StartUp(Version string) {
 	ruleProcessor = processors.NewRuleProcessor(path.Dir(dataDir))
 	streamProcessor = processors.NewStreamProcessor(path.Join(path.Dir(dataDir), "stream"))
 
-	registry = make(RuleRegistry)
+	registry = &RuleRegistry{internal: make(map[string]*RuleState)}
 
 	server := new(Server)
 	//Start rules