function.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. // Copyright erfenjiao, 630166475@qq.com.
  2. // Copyright 2023 EMQ Technologies Co., Ltd.
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. package runtime
  16. import (
  17. "fmt"
  18. "log"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. "github.com/second-state/WasmEdge-go/wasmedge"
  22. )
  23. type WasmFunc struct {
  24. symbolName string
  25. reg *PluginMeta
  26. isAgg int
  27. }
  28. func NewWasmFunc(symbolName string, reg *PluginMeta) (*WasmFunc, error) {
  29. // Setup channel and route the data
  30. conf.Log.Infof("Start running wasm function meta %+v", reg)
  31. return &WasmFunc{
  32. symbolName: symbolName,
  33. reg: reg,
  34. }, nil
  35. }
  36. func (f *WasmFunc) Validate(args []interface{}) error {
  37. if len(args) == 0 {
  38. fmt.Println("[plugin][wasm][runtime][Validate] args is null")
  39. }
  40. var err error
  41. return err
  42. }
  43. func (f *WasmFunc) Exec(args []interface{}, ctx api.FunctionContext) (interface{}, bool) {
  44. res, err := f.ExecWasmFunc(args)
  45. if err != nil {
  46. return err, false
  47. }
  48. return res, true
  49. }
  50. func (f *WasmFunc) IsAggregate() bool {
  51. if f.isAgg > 0 {
  52. return f.isAgg > 1
  53. }
  54. return false
  55. }
  56. func toWasmEdgeValueSlideBindgen(vm *wasmedge.VM, modname *string, vals ...interface{}) ([]interface{}, error) {
  57. rvals := []interface{}{}
  58. for _, val := range vals {
  59. switch t := val.(type) {
  60. case wasmedge.FuncRef:
  61. rvals = append(rvals, val)
  62. case wasmedge.ExternRef:
  63. rvals = append(rvals, val)
  64. case wasmedge.V128:
  65. rvals = append(rvals, val)
  66. case int32:
  67. rvals = append(rvals, val)
  68. case uint32:
  69. rvals = append(rvals, val)
  70. case int64:
  71. rvals = append(rvals, val)
  72. case uint64:
  73. rvals = append(rvals, val)
  74. case int:
  75. rvals = append(rvals, val)
  76. case uint:
  77. rvals = append(rvals, val)
  78. case float32:
  79. rvals = append(rvals, val)
  80. case float64:
  81. rvals = append(rvals, val)
  82. case string:
  83. // Call malloc function
  84. sval := []byte(val.(string))
  85. mallocsize := uint32(len(sval))
  86. var rets []interface{}
  87. var err error = nil
  88. if modname == nil {
  89. rets, err = vm.Execute("malloc", mallocsize+1)
  90. } else {
  91. rets, err = vm.ExecuteRegistered(*modname, "malloc", mallocsize)
  92. }
  93. if err != nil {
  94. return nil, fmt.Errorf("toWasmEdgeValueSlideBindgen(): malloc failed with error %v", err)
  95. }
  96. if len(rets) <= 0 {
  97. return nil, fmt.Errorf("toWasmEdgeValueSlideBindgen(): malloc function signature unexpected")
  98. }
  99. argaddr := rets[0]
  100. rvals = append(rvals, argaddr)
  101. // Set bytes
  102. var mod *wasmedge.Module = nil
  103. var mem *wasmedge.Memory = nil
  104. if modname == nil {
  105. mod = vm.GetActiveModule()
  106. } else {
  107. store := vm.GetStore()
  108. mod = store.FindModule(*modname)
  109. }
  110. if mod != nil {
  111. memnames := mod.ListMemory()
  112. if len(memnames) <= 0 {
  113. return nil, fmt.Errorf("toWasmEdgeValueSlideBindgen(): memory instance not found")
  114. }
  115. mem = mod.FindMemory(memnames[0])
  116. mem.SetData(sval, uint(rets[0].(int32)), uint(mallocsize))
  117. mem.SetData([]byte{0}, uint(rets[0].(int32)+int32(mallocsize)), 1)
  118. }
  119. case []byte:
  120. // Call malloc function
  121. mallocsize := uint32(len(val.([]byte)))
  122. var rets []interface{}
  123. var err error = nil
  124. if modname == nil {
  125. rets, err = vm.Execute("malloc", mallocsize)
  126. } else {
  127. rets, err = vm.ExecuteRegistered(*modname, "malloc", mallocsize)
  128. }
  129. if err != nil {
  130. return nil, fmt.Errorf("toWasmEdgeValueSlideBindgen(): malloc failed")
  131. }
  132. if len(rets) <= 0 {
  133. return nil, fmt.Errorf("toWasmEdgeValueSlideBindgen(): malloc function signature unexpected")
  134. }
  135. argaddr := rets[0]
  136. argsize := mallocsize
  137. rvals = append(rvals, argaddr, argsize)
  138. // Set bytes
  139. var mod *wasmedge.Module = nil
  140. var mem *wasmedge.Memory = nil
  141. if modname == nil {
  142. mod = vm.GetActiveModule()
  143. } else {
  144. store := vm.GetStore()
  145. mod = store.FindModule(*modname)
  146. }
  147. if mod != nil {
  148. memnames := mod.ListMemory()
  149. if len(memnames) <= 0 {
  150. return nil, fmt.Errorf("toWasmEdgeValueSlideBindgen(): memory instance not found")
  151. }
  152. mem = mod.FindMemory(memnames[0])
  153. mem.SetData(val.([]byte), uint(rets[0].(int32)), uint(mallocsize))
  154. }
  155. default:
  156. return nil, fmt.Errorf("wrong argument of toWasmEdgeValueSlideBindgen(): %T not supported", t)
  157. }
  158. }
  159. return rvals, nil
  160. }
  161. func (f *WasmFunc) ExecWasmFunc(args []interface{}) ([]interface{}, error) {
  162. funcname := f.symbolName
  163. WasmFile := f.reg.WasmFile
  164. fmt.Println("[wasm][ExecWasmFunc] WasmFile: ", WasmFile)
  165. conf1 := wasmedge.NewConfigure(wasmedge.WASI)
  166. store := wasmedge.NewStore()
  167. vm := wasmedge.NewVMWithConfigAndStore(conf1, store)
  168. wasi := vm.GetImportModule(wasmedge.WASI)
  169. // step 1: Load WASM file
  170. err := vm.LoadWasmFile(WasmFile)
  171. if err != nil {
  172. fmt.Print("[wasm][ExecWasmFunc] Load WASM from file FAILED: ")
  173. return nil, err
  174. }
  175. // step 2: Validate the WASM module
  176. err = vm.Validate()
  177. if err != nil {
  178. fmt.Print("[wasm][manager-AddWasmPlugin-NewWasmPlugin] Validate FAILED: ")
  179. return nil, err
  180. }
  181. // step 3: Instantiate the WASM moudle
  182. err = vm.Instantiate()
  183. if err != nil {
  184. fmt.Print("[wasm][manager-AddWasmPlugin-NewWasmPlugin] Instantiate FAILED: ")
  185. return nil, err
  186. }
  187. // step 4: Execute WASM functions.Parameters(1)
  188. Args, err := toWasmEdgeValueSlideBindgen(vm, nil, args...)
  189. if err != nil {
  190. return nil, err
  191. }
  192. var res []interface{}
  193. res, err = vm.Execute(funcname, Args...)
  194. if err != nil {
  195. log.Fatalln("[wasm][manager-AddWasmPlugin-NewWasmPlugin] Run function failed: ", err.Error())
  196. return nil, err
  197. } else {
  198. fmt.Print("[wasm][manager-AddWasmPlugin-NewWasmPlugin] Get res: ")
  199. fmt.Println(res[0])
  200. }
  201. exitcode := wasi.WasiGetExitCode()
  202. if exitcode != 0 {
  203. fmt.Println("Go: Running wasm failed, exit code:", exitcode)
  204. }
  205. vm.Release()
  206. return res, nil
  207. }