瀏覽代碼

fix(*): clean rule after deletion (#612)

* fix(topo): fix stop rule error channel overflow

* refactor(etc): disable sink cache by default

Co-authored-by: RockyJin <fahua.jin@gmail.com>
ngjaying 4 年之前
父節點
當前提交
eb407ca279

+ 1 - 1
etc/kuiper.yaml

@@ -41,4 +41,4 @@ sink:
   cacheTriggerCount: 15
 
   # Control to disable cache or not. If it's set to true, then the cache will be disabled, otherwise, it will be enabled.
-  disableCache: false
+  disableCache: true

+ 20 - 4
xstream/nodes/dynamic_channel_buffer.go

@@ -11,6 +11,7 @@ type DynamicChannelBuffer struct {
 	In     chan api.SourceTuple
 	Out    chan api.SourceTuple
 	buffer []api.SourceTuple
+	done   chan bool
 }
 
 func NewDynamicChannelBuffer() *DynamicChannelBuffer {
@@ -19,6 +20,7 @@ func NewDynamicChannelBuffer() *DynamicChannelBuffer {
 		Out:    make(chan api.SourceTuple),
 		buffer: make([]api.SourceTuple, 0),
 		limit:  102400,
+		done:   make(chan bool, 1),
 	}
 	go buffer.run()
 	return buffer
@@ -34,18 +36,28 @@ func (b *DynamicChannelBuffer) run() {
 	for {
 		l := len(b.buffer)
 		if int64(l) >= atomic.LoadInt64(&b.limit) {
-			b.Out <- b.buffer[0]
-			b.buffer = b.buffer[1:]
+			select {
+			case b.Out <- b.buffer[0]:
+				b.buffer = b.buffer[1:]
+			case <-b.done:
+				return
+			}
 		} else if l > 0 {
 			select {
 			case b.Out <- b.buffer[0]:
 				b.buffer = b.buffer[1:]
 			case value := <-b.In:
 				b.buffer = append(b.buffer, value)
+			case <-b.done:
+				return
 			}
 		} else {
-			value := <-b.In
-			b.buffer = append(b.buffer, value)
+			select {
+			case value := <-b.In:
+				b.buffer = append(b.buffer, value)
+			case <-b.done:
+				return
+			}
 		}
 	}
 }
@@ -53,3 +65,7 @@ func (b *DynamicChannelBuffer) run() {
 func (b *DynamicChannelBuffer) GetLength() int {
 	return len(b.buffer)
 }
+
+func (b *DynamicChannelBuffer) Close() {
+	b.done <- true
+}

+ 2 - 0
xstream/nodes/sink_cache.go

@@ -108,6 +108,7 @@ func (c *Cache) timebasedRun(ctx api.StreamContext, saveInterval int) {
 	logger := ctx.GetLogger()
 	c.initStore(ctx)
 	ticker := common.GetTicker(saveInterval)
+	defer ticker.Stop()
 	var tcount = 0
 	for {
 		select {
@@ -200,6 +201,7 @@ func (c *Cache) saveCache(logger api.Logger, p *LinkedQueue) error {
 	if err != nil {
 		logger.Errorf("save cache error while opening cache store: %s", err)
 		logger.Infof("clean the cache and reopen")
+		c.store.Close()
 		c.store.Clean()
 		err = c.store.Open()
 		if err != nil {

+ 1 - 0
xstream/nodes/source_node.go

@@ -129,6 +129,7 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 					case <-ctx.Done():
 						logger.Infof("source %s done", m.name)
 						m.close(ctx, logger)
+						buffer.Close()
 						return
 					case err := <-sourceErrCh:
 						m.drainError(errCh, err, ctx, logger)

+ 1 - 1
xstream/server/server/rest.go

@@ -263,7 +263,7 @@ func ruleHandler(w http.ResponseWriter, r *http.Request) {
 		}
 		jsonResponse(rule, w, logger)
 	case http.MethodDelete:
-		stopRule(name)
+		deleteRule(name)
 		content, err := ruleProcessor.ExecDrop(name)
 		if err != nil {
 			handleError(w, err, "delete rule error", logger)

+ 1 - 1
xstream/server/server/rpc.go

@@ -172,7 +172,7 @@ func (t *Server) ShowRules(_ int, reply *string) error {
 }
 
 func (t *Server) DropRule(name string, reply *string) error {
-	stopRule(name)
+	deleteRule(name)
 	r, err := ruleProcessor.ExecDrop(name)
 	if err != nil {
 		return fmt.Errorf("Drop rule error : %s.", err)

+ 21 - 4
xstream/server/server/ruleManager.go

@@ -64,10 +64,14 @@ func doStartRule(rs *RuleState) error {
 	go func() {
 		tp := rs.Topology
 		select {
-		case err := <-tp.Open():
-			tp.GetContext().SetError(err)
-			logger.Printf("closing rule %s for error: %v", rs.Name, err)
-			tp.Cancel()
+		case err, ok := <-tp.Open():
+			if ok {
+				tp.GetContext().SetError(err)
+				logger.Printf("closing rule %s for error: %v", rs.Name, err)
+				tp.Cancel()
+			} else {
+				logger.Printf("closing rule %s", rs.Name)
+			}
 		}
 	}()
 	return nil
@@ -204,6 +208,19 @@ func stopRule(name string) (result string) {
 	return
 }
 
+func deleteRule(name string) (result string) {
+	if rs, ok := registry.Load(name); ok {
+		if rs.Triggered {
+			(*rs.Topology).Cancel()
+		}
+		registry.Delete(name)
+		result = fmt.Sprintf("Rule %s was deleted.", name)
+	} else {
+		result = fmt.Sprintf("Rule %s was not found.", name)
+	}
+	return
+}
+
 func restartRule(name string) error {
 	stopRule(name)
 	return startRule(name)

+ 4 - 1
xstream/streams.go

@@ -35,7 +35,6 @@ type TopologyNew struct {
 func NewWithNameAndQos(name string, qos api.Qos, checkpointInterval int) (*TopologyNew, error) {
 	tp := &TopologyNew{
 		name:               name,
-		drain:              make(chan error),
 		qos:                qos,
 		checkpointInterval: checkpointInterval,
 		topo: &PrintableTopo{
@@ -51,6 +50,9 @@ func (s *TopologyNew) GetContext() api.StreamContext {
 }
 
 func (s *TopologyNew) Cancel() {
+	if s.drain != nil {
+		close(s.drain)
+	}
 	s.cancel()
 	s.store = nil
 	s.coordinator = nil
@@ -124,6 +126,7 @@ func (s *TopologyNew) Open() <-chan error {
 		return s.drain
 	}
 	s.prepareContext() // ensure context is set
+	s.drain = make(chan error)
 	var err error
 	if s.store, err = states.CreateStore(s.name, s.qos); err != nil {
 		s.drainErr(err)