Browse Source

fea(wasm): add wasmedge support (#1860)

* feat(wasm): support bytes & string (#1731)

* feat(wasm): support bytes & string

Signed-off-by: csh <458761603@qq.com>

* fix(wasm): add wasmedge rustlts plugin

Signed-off-by: zzz <458761603@qq.com>

---------

Signed-off-by: csh <458761603@qq.com>
Signed-off-by: zzz <458761603@qq.com>

* fix(wasm): do not call panic when error happen

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fix(wasm): add make cmd in makefile

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Signed-off-by: superxan <33817352+superrxan@users.noreply.github.com>

* fix(wasm): remove go version in go.work

Signed-off-by: superxan <33817352+superrxan@users.noreply.github.com>

* fix(wasm): add copy right

Signed-off-by: superxan <33817352+superrxan@users.noreply.github.com>

* fix(wasm): fix core lint issue

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fix(wasm): fix core lint issue

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

---------

Signed-off-by: csh <458761603@qq.com>
Signed-off-by: zzz <458761603@qq.com>
Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Signed-off-by: superxan <33817352+superrxan@users.noreply.github.com>
Co-authored-by: zzz <458761603@qq.com>
superxan 1 year ago
parent
commit
d7e8dc91e2

+ 5 - 0
.github/workflows/run_test_case.yaml

@@ -39,6 +39,11 @@ jobs:
     - name: run test case
       run: |
         curl -sSf https://raw.githubusercontent.com/WasmEdge/WasmEdge/master/utils/install.sh | bash > /dev/null && source $HOME/.wasmedge/env
+        
+        wget https://github.com/second-state/wasmedge_rustls_plugin/releases/download/0.1.0/WasmEdge-plugin-wasmedge_rustls-0.1.0-alpha-ubuntu20.04_x86_64.tar
+        tar -xf "WasmEdge-plugin-wasmedge_rustls-0.1.0-alpha-ubuntu20.04_x86_64.tar"
+        mv libwasmedge_rustls.so ~/.wasmedge/plugin/
+
         mkdir -p plugins/wasm
         set -e -u -x
         mkdir -p data

+ 9 - 0
Makefile

@@ -92,6 +92,15 @@ real_pkg:
 	@mv $(BUILD_PATH)/$(PACKAGE_NAME).zip $(BUILD_PATH)/$(PACKAGE_NAME).tar.gz $(PACKAGES_PATH)
 	@echo "Package build success"
 
+.PHONY: build_with_wasm
+build_with_wasm: build_prepare
+	GO111MODULE=on CGO_ENABLED=1 go build -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiper cmd/kuiper/main.go
+	GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "wasmedge" -o kuiperd cmd/kuiperd/main.go
+	@if [ ! -z $$(which upx) ]; then upx ./kuiper; upx ./kuiperd; fi
+	@mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin
+	@echo "Build successfully"
+
+
 .PHONY: docker
 docker:
 	docker buildx build --no-cache --platform=linux/amd64 -t $(TARGET):$(VERSION) -f deploy/docker/Dockerfile . --load

+ 7 - 0
docs/en_US/extension/wasm/overview.md

@@ -104,6 +104,13 @@ fibonacci.json
   "wasmEngine": "wasmedge"
 }
 ```
+## Build eKuiper
+
+The official released eKuiper do not have wasm support, users need build eKuiper by himself
+
+```shell
+make build_with_wasm
+```
 
 Install the plugin:
 

+ 9 - 0
docs/zh_CN/extension/wasm/overview.md

@@ -95,6 +95,15 @@ fibonacci.json
   "wasmEngine": "wasmedge"
 }
 ```
+## 编译 eKuiper
+
+目前官方发布的 eKuiper 并不支持 wasm, 用户需要自行编译。
+
+
+```shell
+make build_with_wasm
+```
+
 安装插件:
 
 首先启动服务器

+ 2 - 0
internal/plugin/plugin.go

@@ -84,6 +84,8 @@ func NewPluginByType(t PluginType) Plugin {
 	switch t {
 	case FUNCTION:
 		return &FuncPlugin{}
+	case WASM:
+		return &FuncPlugin{}
 	default:
 		return &IOPlugin{}
 	}

+ 5 - 1
internal/plugin/wasm/manager.go

@@ -1,4 +1,4 @@
-// Copyright erfenjiao, 630166475@qq.com.
+// Copyright 2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -32,6 +32,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/pkg/httpx"
 	"github.com/lf-edge/ekuiper/internal/plugin"
 	"github.com/lf-edge/ekuiper/internal/plugin/wasm/runtime"
+	"github.com/second-state/WasmEdge-go/wasmedge"
 )
 
 var manager *Manager
@@ -43,6 +44,7 @@ type Manager struct {
 }
 
 func InitManager() (*Manager, error) {
+	wasmedge.LoadPluginDefaultPaths()
 	pluginDir, err := conf.GetPluginsLoc()
 	if err != nil {
 		fmt.Println("[internal][wasm] cannot find plugins folder:", err)
@@ -77,6 +79,8 @@ func InitManager() (*Manager, error) {
 }
 
 func MockManager(plugins map[string]*PluginInfo) (*Manager, error) {
+	wasmedge.LoadPluginDefaultPaths()
+
 	registry := &registry{
 		RWMutex:   sync.RWMutex{},
 		plugins:   make(map[string]*PluginInfo),

+ 132 - 75
internal/plugin/wasm/runtime/function.go

@@ -1,4 +1,5 @@
 // Copyright erfenjiao, 630166475@qq.com.
+// Copyright 2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -18,10 +19,9 @@ import (
 	"fmt"
 	"log"
 
-	"github.com/second-state/WasmEdge-go/wasmedge"
-
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/second-state/WasmEdge-go/wasmedge"
 )
 
 type WasmFunc struct {
@@ -49,19 +49,11 @@ func (f *WasmFunc) Validate(args []interface{}) error {
 }
 
 func (f *WasmFunc) Exec(args []interface{}, ctx api.FunctionContext) (interface{}, bool) {
-	res := f.ExecWasmFunc(args)
-
-	fr := &FuncReply{}
-	fr.Result = res
-	fr.State = true
-	if !fr.State {
-		if fr.Result != nil {
-			return fmt.Errorf("%s", fr.Result), false
-		} else {
-			return nil, false
-		}
+	res, err := f.ExecWasmFunc(args)
+	if err != nil {
+		return err, false
 	}
-	return fr.Result, fr.State
+	return res, true
 }
 
 func (f *WasmFunc) IsAggregate() bool {
@@ -71,7 +63,114 @@ func (f *WasmFunc) IsAggregate() bool {
 	return false
 }
 
-func (f *WasmFunc) ExecWasmFunc(args []interface{}) []interface{} {
+func toWasmEdgeValueSlideBindgen(vm *wasmedge.VM, modname *string, vals ...interface{}) ([]interface{}, error) {
+	rvals := []interface{}{}
+
+	for _, val := range vals {
+		switch t := val.(type) {
+		case wasmedge.FuncRef:
+			rvals = append(rvals, val)
+		case wasmedge.ExternRef:
+			rvals = append(rvals, val)
+		case wasmedge.V128:
+			rvals = append(rvals, val)
+		case int32:
+			rvals = append(rvals, val)
+		case uint32:
+			rvals = append(rvals, val)
+		case int64:
+			rvals = append(rvals, val)
+		case uint64:
+			rvals = append(rvals, val)
+		case int:
+			rvals = append(rvals, val)
+		case uint:
+			rvals = append(rvals, val)
+		case float32:
+			rvals = append(rvals, val)
+		case float64:
+			rvals = append(rvals, val)
+		case string:
+			// Call malloc function
+			sval := []byte(val.(string))
+			mallocsize := uint32(len(sval))
+			var rets []interface{}
+			var err error = nil
+			if modname == nil {
+				rets, err = vm.Execute("malloc", mallocsize+1)
+			} else {
+				rets, err = vm.ExecuteRegistered(*modname, "malloc", mallocsize)
+			}
+			if err != nil {
+				return nil, fmt.Errorf("toWasmEdgeValueSlideBindgen(): malloc failed with error %v", err)
+			}
+			if len(rets) <= 0 {
+				return nil, fmt.Errorf("toWasmEdgeValueSlideBindgen(): malloc function signature unexpected")
+			}
+			argaddr := rets[0]
+			rvals = append(rvals, argaddr)
+			// Set bytes
+			var mod *wasmedge.Module = nil
+			var mem *wasmedge.Memory = nil
+			if modname == nil {
+				mod = vm.GetActiveModule()
+			} else {
+				store := vm.GetStore()
+				mod = store.FindModule(*modname)
+			}
+			if mod != nil {
+				memnames := mod.ListMemory()
+				if len(memnames) <= 0 {
+					return nil, fmt.Errorf("toWasmEdgeValueSlideBindgen(): memory instance not found")
+				}
+				mem = mod.FindMemory(memnames[0])
+				mem.SetData(sval, uint(rets[0].(int32)), uint(mallocsize))
+				mem.SetData([]byte{0}, uint(rets[0].(int32)+int32(mallocsize)), 1)
+			}
+		case []byte:
+			// Call malloc function
+			mallocsize := uint32(len(val.([]byte)))
+			var rets []interface{}
+			var err error = nil
+			if modname == nil {
+				rets, err = vm.Execute("malloc", mallocsize)
+			} else {
+				rets, err = vm.ExecuteRegistered(*modname, "malloc", mallocsize)
+			}
+			if err != nil {
+				return nil, fmt.Errorf("toWasmEdgeValueSlideBindgen(): malloc failed")
+			}
+			if len(rets) <= 0 {
+				return nil, fmt.Errorf("toWasmEdgeValueSlideBindgen(): malloc function signature unexpected")
+			}
+			argaddr := rets[0]
+			argsize := mallocsize
+			rvals = append(rvals, argaddr, argsize)
+			// Set bytes
+			var mod *wasmedge.Module = nil
+			var mem *wasmedge.Memory = nil
+			if modname == nil {
+				mod = vm.GetActiveModule()
+			} else {
+				store := vm.GetStore()
+				mod = store.FindModule(*modname)
+			}
+			if mod != nil {
+				memnames := mod.ListMemory()
+				if len(memnames) <= 0 {
+					return nil, fmt.Errorf("toWasmEdgeValueSlideBindgen(): memory instance not found")
+				}
+				mem = mod.FindMemory(memnames[0])
+				mem.SetData(val.([]byte), uint(rets[0].(int32)), uint(mallocsize))
+			}
+		default:
+			return nil, fmt.Errorf("wrong argument of toWasmEdgeValueSlideBindgen(): %T not supported", t)
+		}
+	}
+	return rvals, nil
+}
+
+func (f *WasmFunc) ExecWasmFunc(args []interface{}) ([]interface{}, error) {
 	funcname := f.symbolName
 
 	WasmFile := f.reg.WasmFile
@@ -84,81 +183,39 @@ func (f *WasmFunc) ExecWasmFunc(args []interface{}) []interface{} {
 	err := vm.LoadWasmFile(WasmFile)
 	if err != nil {
 		fmt.Print("[wasm][ExecWasmFunc] Load WASM from file FAILED: ")
-		fmt.Errorf(err.Error())
+		return nil, err
 	}
 	// step 2: Validate the WASM module
 	err = vm.Validate()
 	if err != nil {
 		fmt.Print("[wasm][manager-AddWasmPlugin-NewWasmPlugin] Validate FAILED: ")
-		fmt.Errorf(err.Error())
+		return nil, err
 	}
 	// step 3: Instantiate the WASM moudle
 	err = vm.Instantiate()
 	if err != nil {
 		fmt.Print("[wasm][manager-AddWasmPlugin-NewWasmPlugin] Instantiate FAILED: ")
-		fmt.Errorf(err.Error())
+		return nil, err
 	}
 	// step 4: Execute WASM functions.Parameters(1)
-	var Args []float64
-	for _, num := range args {
-		x, ok := (num).(float64)
-		if !ok {
-			fmt.Println("Type tranform not to float64!!")
-		}
-		Args = append(Args, x)
+	Args, err := toWasmEdgeValueSlideBindgen(vm, nil, args...)
+	if err != nil {
+		return nil, err
 	}
 
-	Len := len(args)
 	var res []interface{}
-	switch Len {
-	case 0:
-		res, err = vm.Execute(funcname)
-		if err != nil {
-			log.Fatalln("[wasm][manager-AddWasmPlugin-NewWasmPlugin] Run function failed: ", err.Error())
-		}
-		fmt.Print("[wasm][manager-AddWasmPlugin-NewWasmPlugin] Get res: ")
-		fmt.Println(res[0].(int32))
-		exitcode := wasi.WasiGetExitCode()
-		if exitcode != 0 {
-			fmt.Println("Go: Running wasm failed, exit code:", exitcode)
-		}
-		vm.Release()
-	case 1:
-		res, err = vm.Execute(funcname, uint32(Args[0]))
-		if err != nil {
-			log.Fatalln("[wasm][manager-AddWasmPlugin-NewWasmPlugin] Run function failed: ", err.Error())
-		}
-		fmt.Print("[wasm][manager-AddWasmPlugin-NewWasmPlugin] Get res: ")
-		fmt.Println(res[0].(int32))
-		exitcode := wasi.WasiGetExitCode()
-		if exitcode != 0 {
-			fmt.Println("Go: Running wasm failed, exit code:", exitcode)
-		}
-		vm.Release()
-	case 2:
-		res, err = vm.Execute(funcname, uint32(Args[0]), uint32(Args[1]))
-		if err != nil {
-			log.Fatalln("[wasm][manager-AddWasmPlugin-NewWasmPlugin] Run function failed: ", err.Error())
-		}
-		fmt.Print("[wasm][manager-AddWasmPlugin-NewWasmPlugin] Get res: ")
-		fmt.Println(res[0].(int32))
-		exitcode := wasi.WasiGetExitCode()
-		if exitcode != 0 {
-			fmt.Println("Go: Running wasm failed, exit code:", exitcode)
-		}
-		vm.Release()
-	case 3:
-		res, err = vm.Execute(funcname, uint32(Args[0]), uint32(Args[1]), uint32(Args[2]))
-		if err != nil {
-			log.Fatalln("[wasm][manager-AddWasmPlugin-NewWasmPlugin] Run function failed: ", err.Error())
-		}
+	res, err = vm.Execute(funcname, Args...)
+	if err != nil {
+		log.Fatalln("[wasm][manager-AddWasmPlugin-NewWasmPlugin] Run function failed: ", err.Error())
+		return nil, err
+	} else {
 		fmt.Print("[wasm][manager-AddWasmPlugin-NewWasmPlugin] Get res: ")
-		fmt.Println(res[0].(int32))
-		exitcode := wasi.WasiGetExitCode()
-		if exitcode != 0 {
-			fmt.Println("Go: Running wasm failed, exit code:", exitcode)
-		}
-		vm.Release()
+		fmt.Println(res[0])
+	}
+	exitcode := wasi.WasiGetExitCode()
+	if exitcode != 0 {
+		fmt.Println("Go: Running wasm failed, exit code:", exitcode)
 	}
-	return res
+	vm.Release()
+	return res, nil
 }

+ 1 - 5
internal/plugin/wasm/runtime/shard.go

@@ -1,4 +1,5 @@
 // Copyright erfenjiao, 630166475@qq.com.
+// Copyright 2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -43,11 +44,6 @@ type FuncData struct {
 	Arg  interface{} `json:"arg"`
 }
 
-type FuncReply struct {
-	State  bool        `json:"state"`
-	Result interface{} `json:"result"`
-}
-
 type PluginMeta struct {
 	Name       string `json:"name"`
 	Version    string `json:"version"`

+ 7 - 0
internal/plugin/wasm/wasm_test/get_exchange_rate.json

@@ -0,0 +1,7 @@
+{
+    "version": "v1.0.0",
+    "functions": [
+        "get_exchange_rate"
+    ],
+    "wasmEngine": "wasmedge"
+}

BIN
internal/plugin/wasm/wasm_test/get_exchange_rate.wasm


+ 4 - 3
internal/plugin/wasm/wasm_test/wasm_test.go

@@ -31,15 +31,16 @@ import (
 // EDIT HERE: Define the plugins that you want to test.
 var testingPlugin = &wasm.PluginInfo{
 	PluginMeta: runtime.PluginMeta{
-		Name:       "fibonacci",
+		Name:       "get_exchange_rate",
 		Version:    "v1",
 		WasmEngine: "wasmedge",
 	},
-	Functions: []string{"fib"},
+	Functions: []string{"get_exchange_rate"},
 }
 
 var FuncData = []interface{}{
-	25.0, // float
+	// int32(25),
+	"USD", "CNY",
 }
 
 var (

+ 1 - 1
internal/server/core.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-//go:build core
+//go:build core && !plugin && !portable && !service && !schema
 
 package server
 

+ 2 - 2
internal/server/rpc_plugin_both.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-//go:build !core || (rpc && portable && plugin)
+//go:build (!core && !wasmedge) || (rpc && portable && plugin)
 
 package server
 

+ 51 - 14
internal/server/rpc_plugin_wasm.go

@@ -1,4 +1,4 @@
-// Copyright erfenjiao, 630166475@qq.com.
+// Copyright 2023-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -12,41 +12,78 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-//go:build wasm
-// +build wasm
+//go:build !core && wasmedge
 
 package server
 
 import (
+	"encoding/json"
 	"fmt"
-
 	"github.com/lf-edge/ekuiper/internal/plugin"
+	"strings"
 )
 
 func (t *Server) doRegister(pt plugin.PluginType, p plugin.Plugin) error {
-	if pt == plugin.WASM {
+	if pt == plugin.PORTABLE {
+		return portableManager.Register(p)
+	} else if pt == plugin.WASM {
 		return wasmManager.Register(p)
 	} else {
-		return fmt.Errorf("wasm plugin support is disabled")
+		return nativeManager.Register(pt, p)
 	}
 }
 
 func (t *Server) doDelete(pt plugin.PluginType, name string, stopRun bool) error {
-	if pt == plugin.WASM {
+	if pt == plugin.PORTABLE {
+		return portableManager.Delete(name)
+	} else if pt == plugin.WASM {
 		return wasmManager.Delete(name)
 	} else {
-		return fmt.Errorf("wasm plugin support is disabled")
+		return nativeManager.Delete(pt, name, stopRun)
 	}
 }
 
 func (t *Server) doDesc(pt plugin.PluginType, name string) (interface{}, error) {
-	if pt == plugin.WASM {
-		r, ok := wasmManager.GetPluginInfo(name)
-		if !ok {
-			return nil, fmt.Errorf("not found")
+	var (
+		result interface{}
+		ok     bool
+	)
+	if pt == plugin.PORTABLE {
+		result, ok = portableManager.GetPluginInfo(name)
+	} else if pt == plugin.WASM {
+		result, ok = wasmManager.GetPluginInfo(name)
+	} else {
+		result, ok = nativeManager.GetPluginInfo(pt, name)
+	}
+	if !ok {
+		return nil, fmt.Errorf("not found")
+	}
+	return result, nil
+}
+
+func (t *Server) doShow(pt plugin.PluginType) (string, error) {
+	var result string
+	if pt == plugin.PORTABLE {
+		l := portableManager.List()
+		jb, err := json.Marshal(l)
+		if err != nil {
+			return "", err
 		}
-		return r, nil
+		return string(jb), nil
+	} else if pt == plugin.WASM {
+		l := portableManager.List()
+		jb, err := json.Marshal(l)
+		if err != nil {
+			return "", err
+		}
+		return string(jb), nil
 	} else {
-		return nil, fmt.Errorf("wasm plugin support is disabled")
+		l := nativeManager.List(pt)
+		if len(l) == 0 {
+			result = "No plugin is found."
+		} else {
+			result = strings.Join(l, "\n")
+		}
+		return result, nil
 	}
 }

+ 2 - 2
internal/server/wasm_init.go

@@ -12,8 +12,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-//go:build wasm
-// +build wasm
+//go:build wasmedge
+// +build wasmedge
 
 package server