Browse Source

feat(lookup): lookup table runtime

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 năm trước cách đây
mục cha
commit
f9e9e58688

+ 2 - 0
etc/sources/memory.yaml

@@ -0,0 +1,2 @@
+default:
+  index: ["id"]

+ 32 - 25
internal/plugin/native/manager.go

@@ -55,7 +55,7 @@ type Manager struct {
 	// A map from function name to its plugin file name. It is constructed during initialization by reading kv info. All functions must have at least an entry, even the function resizes in a one function plugin.
 	symbols map[string]string
 	// loaded symbols in current runtime
-	runtime map[string]plugin.Symbol
+	runtime map[string]*plugin.Plugin
 	// dirs
 	pluginDir string
 	etcDir    string
@@ -77,7 +77,7 @@ func InitManager() (*Manager, error) {
 	if err != nil {
 		return nil, fmt.Errorf("error when opening db: %v", err)
 	}
-	registry := &Manager{symbols: make(map[string]string), db: db, pluginDir: pluginDir, etcDir: etcDir, runtime: make(map[string]plugin.Symbol)}
+	registry := &Manager{symbols: make(map[string]string), db: db, pluginDir: pluginDir, etcDir: etcDir, runtime: make(map[string]*plugin.Plugin)}
 	manager = registry
 	plugins := make([]map[string]string, 3)
 	for i := range plugin2.PluginTypes {
@@ -116,7 +116,7 @@ func findAll(t plugin2.PluginType, pluginDir string) (result map[string]string,
 			n, v := parseName(baseName)
 			//load the plugins when ekuiper set up
 			if !conf.IsTesting {
-				if _, err := manager.loadRuntime(t, n, path.Join(dir, baseName)); err != nil {
+				if _, err := manager.loadRuntime(t, n, path.Join(dir, baseName), ""); err != nil {
 					continue
 				}
 			}
@@ -513,7 +513,7 @@ func (rr *Manager) install(t plugin2.PluginType, name, src string, shellParas []
 
 	if !conf.IsTesting {
 		// load the runtime first
-		_, err = manager.loadRuntime(t, soName, soPath)
+		_, err = manager.loadRuntime(t, soName, soPath, "")
 		if err != nil {
 			return version, err
 		}
@@ -526,7 +526,7 @@ func (rr *Manager) install(t plugin2.PluginType, name, src string, shellParas []
 // binder factory implementations
 
 func (rr *Manager) Source(name string) (api.Source, error) {
-	nf, err := rr.loadRuntime(plugin2.SOURCE, name, "")
+	nf, err := rr.loadRuntime(plugin2.SOURCE, name, "", "")
 	if err != nil {
 		return nil, err
 	}
@@ -544,7 +544,7 @@ func (rr *Manager) Source(name string) (api.Source, error) {
 }
 
 func (rr *Manager) LookupSource(name string) (api.LookupSource, error) {
-	nf, err := rr.loadRuntime(plugin2.SOURCE, name+"Lookup", "")
+	nf, err := rr.loadRuntime(plugin2.SOURCE, name, "", ucFirst(name)+"Lookup")
 	if err != nil {
 		return nil, err
 	}
@@ -562,7 +562,7 @@ func (rr *Manager) LookupSource(name string) (api.LookupSource, error) {
 }
 
 func (rr *Manager) Sink(name string) (api.Sink, error) {
-	nf, err := rr.loadRuntime(plugin2.SINK, name, "")
+	nf, err := rr.loadRuntime(plugin2.SINK, name, "", "")
 	if err != nil {
 		return nil, err
 	}
@@ -582,7 +582,7 @@ func (rr *Manager) Sink(name string) (api.Sink, error) {
 }
 
 func (rr *Manager) Function(name string) (api.Function, error) {
-	nf, err := rr.loadRuntime(plugin2.FUNCTION, name, "")
+	nf, err := rr.loadRuntime(plugin2.FUNCTION, name, "", "")
 	if err != nil {
 		return nil, err
 	}
@@ -615,43 +615,50 @@ func (rr *Manager) ConvName(name string) (string, bool) {
 }
 
 // If not found, return nil,nil; Other errors return nil, err
-func (rr *Manager) loadRuntime(t plugin2.PluginType, name, soFilepath string) (plugin.Symbol, error) {
-	ut := ucFirst(name)
+func (rr *Manager) loadRuntime(t plugin2.PluginType, soName, soFilepath, symbolName string) (plugin.Symbol, error) {
 	ptype := plugin2.PluginTypes[t]
-	key := ptype + "/" + name
-	var nf plugin.Symbol
+	key := ptype + "/" + soName
+	var (
+		plug *plugin.Plugin
+		ok   bool
+		err  error
+	)
 	rr.RLock()
-	nf, ok := rr.runtime[key]
+	plug, ok = rr.runtime[key]
 	rr.RUnlock()
 	if !ok {
 		var soPath string
 		if soFilepath != "" {
 			soPath = soFilepath
 		} else {
-			mod, err := rr.getSoFilePath(t, name, false)
+			mod, err := rr.getSoFilePath(t, soName, false)
 			if err != nil {
-				conf.Log.Warnf(fmt.Sprintf("cannot find the native plugin %s in path: %v", name, err))
+				conf.Log.Warnf(fmt.Sprintf("cannot find the native plugin %s in path: %v", soName, err))
 				return nil, nil
 			}
 			soPath = mod
 		}
 		conf.Log.Debugf("Opening plugin %s", soPath)
-		plug, err := plugin.Open(soPath)
+		plug, err = plugin.Open(soPath)
 		if err != nil {
-			conf.Log.Errorf(fmt.Sprintf("plugin %s open error: %v", name, err))
+			conf.Log.Errorf(fmt.Sprintf("plugin %s open error: %v", soName, err))
 			return nil, fmt.Errorf("cannot open %s: %v", soPath, err)
 		}
-		conf.Log.Debugf("Successfully open plugin %s", soPath)
-		nf, err = plug.Lookup(ut)
-		if err != nil {
-			conf.Log.Warnf(fmt.Sprintf("cannot find symbol %s, please check if it is exported: %v", ut, err))
-			return nil, nil
-		}
-		conf.Log.Debugf("Successfully look-up plugin %s", soPath)
 		rr.Lock()
-		rr.runtime[key] = nf
+		rr.runtime[key] = plug
 		rr.Unlock()
+		conf.Log.Debugf("Successfully open plugin %s", soPath)
+	}
+	if symbolName == "" {
+		symbolName = ucFirst(soName)
+	}
+	conf.Log.Debugf("Loading symbol %s", symbolName)
+	nf, err := plug.Lookup(symbolName)
+	if err != nil {
+		conf.Log.Warnf(fmt.Sprintf("cannot find symbol %s, please check if it is exported: %v", symbolName, err))
+		return nil, nil
 	}
+	conf.Log.Debugf("Successfully look-up plugin %s", symbolName)
 	return nf, nil
 }
 

+ 49 - 0
internal/processor/stream.go

@@ -20,6 +20,7 @@ import (
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/store"
+	"github.com/lf-edge/ekuiper/internal/topo/lookup"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
@@ -99,7 +100,49 @@ func (p *StreamProcessor) ExecStmt(statement string) (result []string, err error
 	return
 }
 
+func (p *StreamProcessor) RecoverLookupTable() error {
+	keys, err := p.db.Keys()
+	if err != nil {
+		return fmt.Errorf("error loading data from db: %v.", err)
+	}
+	var (
+		v  string
+		vs = &xsql.StreamInfo{}
+	)
+	for _, k := range keys {
+		if ok, _ := p.db.Get(k, &v); ok {
+			if err := json.Unmarshal([]byte(v), vs); err == nil && vs.StreamType == ast.TypeTable {
+				parser := xsql.NewParser(strings.NewReader(vs.Statement))
+				stmt, e := xsql.Language.Parse(parser)
+				if e != nil {
+					log.Error(err)
+				}
+				switch s := stmt.(type) {
+				case *ast.StreamStmt:
+					log.Infof("Starting lookup table %s", s.Name)
+					e = lookup.CreateInstance(string(s.Name), s.Options.TYPE, s.Options)
+					if err != nil {
+						log.Errorf("%s", err.Error())
+						return err
+					}
+				default:
+					log.Errorf("Invalid lookup table statement: %s", vs.Statement)
+				}
+
+			}
+		}
+	}
+	return nil
+}
+
 func (p *StreamProcessor) execSave(stmt *ast.StreamStmt, statement string, replace bool) error {
+	if stmt.StreamType == ast.TypeTable && stmt.Options.KIND == ast.StreamKindLookup {
+		log.Infof("Creating lookup table %s", stmt.Name)
+		err := lookup.CreateInstance(string(stmt.Name), stmt.Options.TYPE, stmt.Options)
+		if err != nil {
+			return err
+		}
+	}
 	s, err := json.Marshal(xsql.StreamInfo{
 		StreamType: stmt.StreamType,
 		Statement:  statement,
@@ -281,6 +324,12 @@ func (p *StreamProcessor) execDrop(stmt ast.NameNode, st ast.StreamType) (string
 }
 
 func (p *StreamProcessor) DropStream(name string, st ast.StreamType) (string, error) {
+	if st == ast.TypeTable {
+		err := lookup.DropInstance(name)
+		if err != nil {
+			return "", err
+		}
+	}
 	_, err := p.getStream(name, st)
 	if err != nil {
 		return "", err

+ 2 - 0
internal/server/server.go

@@ -74,6 +74,8 @@ func StartUp(Version, LoadFileType string) {
 	meta.Bind()
 
 	registry = &RuleRegistry{internal: make(map[string]*RuleState)}
+	//Start lookup tables
+	streamProcessor.RecoverLookupTable()
 	//Start rules
 	if rules, err := ruleProcessor.GetAllRules(); err != nil {
 		logger.Infof("Start rules error: %s", err)

+ 5 - 1
internal/topo/lookup/table.go

@@ -72,16 +72,20 @@ func CreateInstance(name string, sourceType string, options *ast.Options) error
 	// Create the lookup source according to the source options
 	ns, err := io.LookupSource(sourceType)
 	if err != nil {
+		ctx.GetLogger().Error(err)
 		return err
 	}
+	ctx.GetLogger().Debugf("lookup source %s is created", sourceType)
 	err = ns.Configure(options.DATASOURCE, props)
 	if err != nil {
 		return err
 	}
+	ctx.GetLogger().Debugf("lookup source %s is configured", sourceType)
 	err = ns.Open(ctx)
 	if err != nil {
 		return err
 	}
+	ctx.GetLogger().Debugf("lookup source %s is opened", sourceType)
 	instances[name] = &info{ls: ns, count: 0}
 	return nil
 }
@@ -97,6 +101,6 @@ func DropInstance(name string) error {
 		delete(instances, name)
 		return nil
 	} else {
-		return fmt.Errorf("lookup table %s is not found", name)
+		return nil
 	}
 }

+ 98 - 0
internal/topo/lookup/table_test.go

@@ -0,0 +1,98 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package lookup
+
+import (
+	"github.com/lf-edge/ekuiper/pkg/ast"
+	"sync"
+	"testing"
+)
+
+func TestTable(t *testing.T) {
+	var wg sync.WaitGroup
+	wg.Add(2)
+	err := CreateInstance("test1", "memory", &ast.Options{
+		DATASOURCE: "test",
+		TYPE:       "memory",
+		KIND:       "lookup",
+	})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	err = CreateInstance("test2", "memory", &ast.Options{
+		DATASOURCE: "test2",
+		TYPE:       "memory",
+		KIND:       "lookup",
+	})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	go func() {
+		for i := 0; i < 3; i++ {
+			_, err := Attach("test1")
+			if err != nil {
+				t.Error(err)
+			}
+			_, err = Attach("test2")
+			if err != nil {
+				t.Error(err)
+			}
+		}
+		wg.Done()
+	}()
+	go func() {
+		for i := 0; i < 3; i++ {
+			_, err := Attach("test2")
+			if err != nil {
+				t.Error(err)
+			}
+			_, err = Attach("test1")
+			if err != nil {
+				t.Error(err)
+			}
+		}
+		wg.Done()
+	}()
+	wg.Wait()
+	if len(instances) != 2 {
+		t.Errorf("expect 2 instances, but got %d", len(instances))
+		return
+	}
+	for _, ins := range instances {
+		if ins.count != 6 {
+			t.Errorf("expect 6 count, but got %d", ins.count)
+			return
+		}
+	}
+	err = DropInstance("test1")
+	if err == nil {
+		t.Error("should have error to drop instance")
+		return
+	}
+	for i := 0; i < 6; i++ {
+		Detach("test1")
+	}
+	err = DropInstance("test1")
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	if len(instances) != 1 {
+		t.Errorf("expect 2 instances, but got %d", len(instances))
+		return
+	}
+}

+ 49 - 1
internal/topo/memory/lookupsource_test.go

@@ -25,7 +25,7 @@ import (
 	"time"
 )
 
-func TestSingleKeyLookup(t *testing.T) {
+func TestNoIndexLookup(t *testing.T) {
 	contextLogger := conf.Log.WithField("rule", "test")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	ls := GetLookupSource()
@@ -72,3 +72,51 @@ func TestSingleKeyLookup(t *testing.T) {
 		return
 	}
 }
+
+func TestSingleIndexLookup(t *testing.T) {
+	contextLogger := conf.Log.WithField("rule", "test2")
+	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+	ls := GetLookupSource()
+	err := ls.Configure("test2", map[string]interface{}{"index": []string{"ff"}})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	err = ls.Open(ctx)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	// wait for the source to be ready
+	time.Sleep(100 * time.Millisecond)
+	pubsub.Produce(ctx, "test2", map[string]interface{}{"ff": "value1", "gg": "value2"})
+	pubsub.Produce(ctx, "test2", map[string]interface{}{"ff": "value2", "gg": "value2"})
+	pubsub.Produce(ctx, "test2", map[string]interface{}{"ff": "value1", "gg": "value4"})
+	// wait for table accumulation
+	time.Sleep(100 * time.Millisecond)
+	canctx, cancel := gocontext.WithCancel(gocontext.Background())
+	defer cancel()
+	go func() {
+		for {
+			select {
+			case <-canctx.Done():
+				return
+			case <-time.After(10 * time.Millisecond):
+				pubsub.Produce(ctx, "test", map[string]interface{}{"ff": "value4", "gg": "value2"})
+			}
+		}
+	}()
+	expected := []api.SourceTuple{
+		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value2"}, map[string]interface{}{"topic": "test2"}),
+		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value4"}, map[string]interface{}{"topic": "test2"}),
+	}
+	result, err := ls.Lookup(ctx, []string{"ff"}, []interface{}{"value1"})
+	if !reflect.DeepEqual(result, expected) {
+		t.Errorf("expect %v but got %v", expected, result)
+	}
+	err = ls.Close(ctx)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+}