Browse Source

fix(rule): fix rule qos 1 and sink cache error (#1204)

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
superxan 3 years atrás
parent
commit
5268084a32

+ 31 - 7
internal/pkg/store/redis/redisKv.go

@@ -112,6 +112,18 @@ func (kv redisKvStore) Delete(key string) error {
 }
 
 func (kv redisKvStore) Keys() ([]string, error) {
+	keys, err := kv.metaKeys()
+	if err != nil {
+		return nil, err
+	}
+	result := make([]string, 0)
+	for _, k := range keys {
+		result = append(result, kv.trimPrefix(k))
+	}
+	return result, nil
+}
+
+func (kv redisKvStore) metaKeys() ([]string, error) {
 	keys := make([]string, 0)
 	err := kv.database.Apply(func(conn redis.Conn) error {
 		pattern := fmt.Sprintf("%s:*", kv.keyPrefix)
@@ -119,20 +131,32 @@ func (kv redisKvStore) Keys() ([]string, error) {
 		keys, err = redis.Strings(reply, err)
 		return err
 	})
-	result := make([]string, 0)
-	for _, k := range keys {
-		result = append(result, kv.trimPrefix(k))
-	}
-	return result, err
+	return keys, err
 }
 
 func (kv redisKvStore) Clean() error {
-	keys, err := kv.Keys()
+	keys, err := kv.metaKeys()
+	if err != nil {
+		return err
+	}
+	keysToRemove := make([]interface{}, len(keys))
+	for i, v := range keys {
+		keysToRemove[i] = v
+	}
+	err = kv.database.Apply(func(conn redis.Conn) error {
+		_, err := conn.Do("DEL", keysToRemove...)
+		return err
+	})
+	return err
+}
+
+func (kv redisKvStore) Drop() error {
+	keys, err := kv.metaKeys()
 	if err != nil {
 		return err
 	}
 	keysToRemove := make([]interface{}, len(keys))
-	for i, v := range keysToRemove {
+	for i, v := range keys {
 		keysToRemove[i] = v
 	}
 	err = kv.database.Apply(func(conn redis.Conn) error {

+ 8 - 0
internal/pkg/store/sql/sqlKv.go

@@ -149,3 +149,11 @@ func (kv *sqlKvStore) Clean() error {
 		return err
 	})
 }
+
+func (kv *sqlKvStore) Drop() error {
+	return kv.database.Apply(func(db *sql.DB) error {
+		query := fmt.Sprintf("Drop table '%s';", kv.table)
+		_, err := db.Exec(query)
+		return err
+	})
+}

+ 36 - 0
internal/pkg/store/stores.go

@@ -65,6 +65,16 @@ func (s *stores) GetKV(table string) (error, kv2.KeyValue) {
 	return nil, ks
 }
 
+func (s *stores) DropKV(table string) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	if ks, contains := s.kv[table]; contains {
+		_ = ks.Drop()
+		delete(s.ts, table)
+	}
+}
+
 func (s *stores) GetTS(table string) (error, kv2.Tskv) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
@@ -79,6 +89,16 @@ func (s *stores) GetTS(table string) (error, kv2.Tskv) {
 	return nil, tts
 }
 
+func (s *stores) DropTS(table string) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	if tts, contains := s.ts[table]; contains {
+		_ = tts.Drop()
+		delete(s.ts, table)
+	}
+}
+
 var globalStores *stores = nil
 
 func InitGlobalStores(db db.Database) error {
@@ -100,3 +120,19 @@ func GetTS(table string) (error, kv2.Tskv) {
 	}
 	return globalStores.GetTS(table)
 }
+
+func DropTS(table string) error {
+	if globalStores == nil {
+		return fmt.Errorf("global stores are not initialized")
+	}
+	globalStores.DropTS(table)
+	return nil
+}
+
+func DropKV(table string) error {
+	if globalStores == nil {
+		return fmt.Errorf("global stores are not initialized")
+	}
+	globalStores.DropKV(table)
+	return nil
+}

+ 4 - 20
internal/processor/rule.go

@@ -25,9 +25,9 @@ import (
 	"github.com/lf-edge/ekuiper/internal/topo/planner"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
-	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/kv"
+	"path"
 )
 
 type RuleProcessor struct {
@@ -233,33 +233,17 @@ func (p *RuleProcessor) ExecDrop(name string) (string, error) {
 }
 
 func cleanCheckpoint(name string) error {
-	err, db := store.GetTS(name)
+	err := store.DropTS(name)
 	if err != nil {
 		return err
 	}
-	return db.Drop()
+	return nil
 }
 
 func cleanSinkCache(rule *api.Rule) error {
-	err, store := store.GetKV("sink")
+	err := store.DropKV(path.Join("sink", rule.Id))
 	if err != nil {
 		return err
 	}
-	for d, m := range rule.Actions {
-		con := 1
-		for name, action := range m {
-			props, _ := action.(map[string]interface{})
-			if c, ok := props["concurrency"]; ok {
-				if t, err := cast.ToInt(c, cast.STRICT); err == nil && t > 0 {
-					con = t
-				}
-			}
-			for i := 0; i < con; i++ {
-				key := fmt.Sprintf("%s%s_%d%d", rule.Id, name, d, i)
-				conf.Log.Debugf("delete cache key %s", key)
-				store.Delete(key)
-			}
-		}
-	}
 	return nil
 }

+ 1 - 0
internal/topo/connection/clients/mqtt/mqtt_wrapper.go

@@ -232,6 +232,7 @@ func (mc *mqttClientWrapper) AddRef() {
 	mc.refLock.Lock()
 	defer mc.refLock.Unlock()
 	mc.refCnt = mc.refCnt + 1
+	conf.Log.Infof("mqtt client wrapper add refence for connection selector %s total refcount %d", mc.conSelector, mc.refCnt)
 }
 
 func (mc *mqttClientWrapper) DeRef(c api.StreamContext) {

+ 1 - 1
internal/topo/node/sink_cache.go

@@ -198,7 +198,7 @@ func (c *Cache) loadCache() error {
 
 func (c *Cache) saveCache(logger api.Logger, p *LinkedQueue) error {
 	logger.Infof("clean the cache and reopen")
-	c.store.Clean()
+	_ = c.store.Delete(c.key)
 
 	return c.store.Set(c.key, p)
 }

+ 26 - 21
internal/topo/node/sink_node.go

@@ -195,9 +195,9 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 							}
 							stats.SetBufferLength(int64(len(m.input)))
 							if sconf.RunAsync {
-								go doCollect(ctx, sink, data, stats, sconf, nil)
+								go doCollect(ctx, sink, data, stats, sconf)
 							} else {
-								doCollect(ctx, sink, data, stats, sconf, nil)
+								doCollect(ctx, sink, data, stats, sconf)
 							}
 						case <-ctx.Done():
 							logger.Infof("sink node %s instance %d done", m.name, instance)
@@ -213,9 +213,9 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 					logger.Infof("Creating sink cache")
 					var cache *Cache
 					if m.qos >= api.AtLeastOnce {
-						cache = NewCheckpointbasedCache(m.input, sconf.CacheLength, m.tch, result, ctx)
+						cache = NewCheckpointbasedCache(m.input, sconf.CacheLength, m.tch, result, ctx.WithInstance(instance))
 					} else {
-						cache = NewTimebasedCache(m.input, sconf.CacheLength, sconf.CacheSaveInterval, result, ctx)
+						cache = NewTimebasedCache(m.input, sconf.CacheLength, sconf.CacheSaveInterval, result, ctx.WithInstance(instance))
 					}
 					for {
 						select {
@@ -227,9 +227,25 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 							}
 							stats.SetBufferLength(int64(len(m.input)))
 							if sconf.RunAsync {
-								go doCollect(ctx, sink, data, stats, sconf, cache.Complete)
+								go func() {
+									doCollect(ctx, sink, data.data, stats, sconf)
+									if cache.Complete != nil {
+										select {
+										case cache.Complete <- data.index:
+										default:
+											ctx.GetLogger().Warnf("sink cache missing response for %d", data.index)
+										}
+									}
+								}()
 							} else {
-								doCollect(ctx, sink, data, stats, sconf, cache.Complete)
+								doCollect(ctx, sink, data.data, stats, sconf)
+								if cache.Complete != nil {
+									select {
+									case cache.Complete <- data.index:
+									default:
+										ctx.GetLogger().Warnf("sink cache missing response for %d", data.index)
+									}
+								}
 							}
 						case <-ctx.Done():
 							logger.Infof("sink node %s instance %d done", m.name, instance)
@@ -252,7 +268,7 @@ func (m *SinkNode) reset() {
 	m.statManagers = nil
 }
 
-func doCollect(ctx api.StreamContext, sink api.Sink, item interface{}, stats StatManager, sconf *SinkConf, signalCh chan<- int) {
+func doCollect(ctx api.StreamContext, sink api.Sink, item interface{}, stats StatManager, sconf *SinkConf) {
 	stats.IncTotalRecordsIn()
 	stats.ProcessTimeStart()
 	defer stats.ProcessTimeEnd()
@@ -274,20 +290,20 @@ func doCollect(ctx api.StreamContext, sink api.Sink, item interface{}, stats Sta
 		return
 	}
 	if !sconf.SendSingle {
-		doCollectData(ctx, sink, outs, stats, sconf, signalCh)
+		doCollectData(ctx, sink, outs, stats, sconf)
 	} else {
 		for _, d := range outs {
 			if sconf.Omitempty && (d == nil || len(d) == 0) {
 				ctx.GetLogger().Debugf("receive empty in sink")
 				continue
 			}
-			doCollectData(ctx, sink, d, stats, sconf, signalCh)
+			doCollectData(ctx, sink, d, stats, sconf)
 		}
 	}
 }
 
 // doCollectData outData must be map or []map
-func doCollectData(ctx api.StreamContext, sink api.Sink, outData interface{}, stats StatManager, sconf *SinkConf, signalCh chan<- int) {
+func doCollectData(ctx api.StreamContext, sink api.Sink, outData interface{}, stats StatManager, sconf *SinkConf) {
 	retries := sconf.RetryCount
 	for {
 		select {
@@ -308,17 +324,6 @@ func doCollectData(ctx api.StreamContext, sink api.Sink, outData interface{}, st
 			} else {
 				ctx.GetLogger().Debugf("success")
 				stats.IncTotalRecordsOut()
-				if signalCh != nil {
-					cacheTuple, ok := outData.(*CacheTuple)
-					if !ok {
-						ctx.GetLogger().Warnf("got none cache tuple %v, should not happen", outData)
-					}
-					select {
-					case signalCh <- cacheTuple.index:
-					default:
-						ctx.GetLogger().Warnf("sink cache missing response for %d", cacheTuple.index)
-					}
-				}
 				return
 			}
 		}

+ 1 - 0
pkg/kv/kv.go

@@ -24,4 +24,5 @@ type KeyValue interface {
 	Delete(key string) error
 	Keys() (keys []string, err error)
 	Clean() error
+	Drop() error
 }