Browse Source

fea(plug): add general tensorflow function (#1513)

* fea(plug): add general tensorflow function

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fea(plug): add general tensorflow function

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fea(plug): add general tensorflow function

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fea(plug): add general tensorflow function

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Signed-off-by: Jianxiang Ran <jianxiang.ran@emqx.io>

* fea(plug): add general tensorflow function

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fea(plug): add general tensorflow function

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fea(plug): add general tensorflow function

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Signed-off-by: Jianxiang Ran <jianxiang.ran@emqx.io>

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Signed-off-by: Jianxiang Ran <jianxiang.ran@emqx.io>
superxan 2 years ago
parent
commit
a5852b58cb

+ 1 - 0
.github/workflows/build_packages.yaml

@@ -193,6 +193,7 @@ jobs:
           - functions/image
           - functions/geohash
           - functions/labelImage
+          - functions/tfLite
         arch:
           - linux/amd64
           - linux/arm64

+ 2 - 1
Makefile

@@ -97,7 +97,8 @@ PLUGINS := sinks/file \
 	functions/image \
 	functions/geohash \
 	functions/echo \
-	functions/labelImage
+	functions/labelImage \
+	functions/tfLite
 
 .PHONY: plugins $(PLUGINS)
 plugins: $(PLUGINS)

+ 17 - 3
build-plugins.sh

@@ -44,15 +44,29 @@ build(){
             go build -trimpath -modfile extensions.mod --buildmode=plugin -tags plugins -o extensions/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so extensions/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME.go
             ;;
         labelImage )
-            git clone -b v2.2.0-rc3 --depth 1 https://github.com/tensorflow/tensorflow.git /tmp/tensorflow;
+            if [ ! -d "/tmp/tensorflow" ];then
+                git clone -b v2.2.0-rc3 --depth 1 https://github.com/tensorflow/tensorflow.git /tmp/tensorflow;
+            fi;
             if [ "$(uname -m)" = "x86_64" ]; then
-                mv $(pwd)/extensions/functions/labelImage/dependencies/amd64/*.so $(pwd)/extensions/functions/labelImage/lib
+                cp $(pwd)/extensions/functions/dependencies/tensorflow/amd64/*.so $(pwd)/extensions/functions/labelImage/lib
             fi;
             if [ "$(uname -m)" = "aarch64" ]; then
-                mv $(pwd)/extensions/functions/labelImage/dependencies/arm64/*.so $(pwd)/extensions/functions/labelImage/lib
+                cp $(pwd)/extensions/functions/dependencies/tensorflow/arm64/*.so $(pwd)/extensions/functions/labelImage/lib
             fi;
             CGO_CFLAGS=-I/tmp/tensorflow CGO_LDFLAGS=-L$(pwd)/extensions/functions/labelImage/lib go build -trimpath -modfile extensions.mod --buildmode=plugin -o extensions/functions/labelImage/labelImage.so extensions/functions/labelImage/*.go
             ;;
+        tfLite )
+            if [ ! -d "/tmp/tensorflow" ];then
+                git clone -b v2.2.0-rc3 --depth 1 https://github.com/tensorflow/tensorflow.git /tmp/tensorflow;
+            fi;
+            if [ "$(uname -m)" = "x86_64" ]; then
+                cp $(pwd)/extensions/functions/dependencies/tensorflow/amd64/*.so $(pwd)/extensions/functions/tfLite/lib
+            fi;
+            if [ "$(uname -m)" = "aarch64" ]; then
+                cp $(pwd)/extensions/functions/dependencies/tensorflow/arm64/*.so $(pwd)/extensions/functions/tfLite/lib
+            fi;
+                CGO_CFLAGS=-I/tmp/tensorflow CGO_LDFLAGS=-L$(pwd)/extensions/functions/tfLite/lib go build -trimpath -modfile extensions.mod --buildmode=plugin -o extensions/functions/tfLite/tfLite.so extensions/functions/tfLite/*.go
+            ;;
         * )
             go build -trimpath -modfile extensions.mod --buildmode=plugin -o extensions/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so extensions/$PLUGIN_TYPE/$PLUGIN_NAME/*.go
           ;;

extensions/functions/labelImage/dependencies/amd64/libtensorflowlite.so → extensions/functions/dependencies/tensorflow/amd64/libtensorflowlite.so


extensions/functions/labelImage/dependencies/amd64/libtensorflowlite_c.so → extensions/functions/dependencies/tensorflow/amd64/libtensorflowlite_c.so


extensions/functions/labelImage/dependencies/arm64/libtensorflowlite.so → extensions/functions/dependencies/tensorflow/arm64/libtensorflowlite.so


extensions/functions/labelImage/dependencies/arm64/libtensorflowlite_c.so → extensions/functions/dependencies/tensorflow/arm64/libtensorflowlite_c.so


+ 1 - 1
extensions/functions/geohash/geohash.json

@@ -17,7 +17,7 @@
 		}
 	},
 	"libs": ["github.com/mmcloughlin/geohash@master"],
-  "name":"geohash",
+	"name":"geohash",
 	"functions": [{
 		"name": "geohashEncode",
 		"example": "geohashEncode(la,lo)",

+ 0 - 3
extensions/functions/labelImage/labelImage.go

@@ -12,9 +12,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-//go:build tflite
-// +build tflite
-
 package main
 
 import (

+ 40 - 0
extensions/functions/tfLite/install.sh

@@ -0,0 +1,40 @@
+#!/bin/sh
+#
+# Copyright 2021 EMQ Technologies Co., Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+dir=/usr/local/tflite
+cur=$(dirname "$0")
+echo "Base path $cur" 
+if [ -d "$dir" ]; then
+    echo "SDK path $dir exists." 
+else
+    echo "Creating SDK path $dir"
+    mkdir -p $dir
+    echo "Created SDK path $dir"
+    echo "Moving libs"
+    cp -R $cur/lib $dir
+    echo "Moved libs"
+fi
+
+if [ -f "/etc/ld.so.conf.d/tflite.conf" ]; then
+    echo "/etc/ld.so.conf.d/tflite.conf exists"
+else
+    echo "Copy conf file"
+    cp $cur/tflite.conf /etc/ld.so.conf.d/
+    echo "Copied conf file"
+fi
+ldconfig
+echo "Done"

+ 70 - 0
extensions/functions/tfLite/interpreters.go

@@ -0,0 +1,70 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/mattn/go-tflite"
+	"path/filepath"
+	"sync"
+)
+
+var ipManager *interpreterManager
+
+func init() {
+	path, err := conf.GetDataLoc()
+	if err != nil {
+		panic(err)
+	}
+	ipManager = &interpreterManager{
+		registry: make(map[string]*tflite.Interpreter),
+		path:     filepath.Join(path, "etc"),
+	}
+}
+
+type interpreterManager struct {
+	sync.Mutex
+	registry map[string]*tflite.Interpreter
+	path     string
+}
+
+func (m *interpreterManager) GetOrCreate(name string) (*tflite.Interpreter, error) {
+	m.Lock()
+	defer m.Unlock()
+	ip, ok := m.registry[name]
+	if !ok {
+		mf := filepath.Join(m.path, name+".tflite")
+		model := tflite.NewModelFromFile(mf)
+		if model == nil {
+			return nil, fmt.Errorf("fail to load model: %s", mf)
+		}
+		defer model.Delete()
+		options := tflite.NewInterpreterOptions()
+		options.SetNumThread(4)
+		options.SetErrorReporter(func(msg string, user_data interface{}) {
+			fmt.Println(msg)
+		}, nil)
+		defer options.Delete()
+		ip = tflite.NewInterpreter(model, options)
+		status := ip.AllocateTensors()
+		if status != tflite.OK {
+			ip.Delete()
+			return nil, fmt.Errorf("allocate failed: %v", status)
+		}
+		m.registry[name] = ip
+	}
+	return ip, nil
+}

+ 25 - 0
extensions/functions/tfLite/lib/Readme.md

@@ -0,0 +1,25 @@
+# Tensorflow Lite C API library
+
+This is the prebuilt tensorflow lite c library for debian 10. It can be used directly in eKuiper docker image of tags x.x.x or x.x.x-slim.
+
+To use in other environment, you need to build the library from source.
+
+## Build from source
+
+Here are the steps to build from source in debian. 
+
+1. Install [Python](https://www.tensorflow.org/install/pip#1.-install-the-python-development-environment-on-your-system)
+
+2. Install required python lib: `pip3 install -r requirements.txt`. The requirements are from `tensorflow/tensorflow/tools/pip_package/setup.py` of the corresponding tensorflow version.
+
+3. Install [Bazel](https://docs.bazel.build/versions/4.0.0/install-ubuntu.html)
+
+4. Clone [tensorflow](https://github.com/tensorflow/tensorflow),switch to `git checkout v2.2.0-rc3 -b mybranch`
+
+5. Build the so files, the outputs are in ./bazel-bin
+
+   ```bash
+   $ cd $tensorflowSrc
+   $ bazel build --config monolithic -c opt //tensorflow/lite:libtensorflowlite.so
+   $ bazel build --config monolithic -c opt //tensorflow/lite/c:libtensorflowlite_c.so
+   ```

+ 262 - 0
extensions/functions/tfLite/tfLite.go

@@ -0,0 +1,262 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/mattn/go-tflite"
+	"strconv"
+)
+
+type Tffunc struct {
+}
+
+// Validate the arguments.
+// args[0]: string, model name which maps to a path
+// args[1 to n]: tensors
+func (f *Tffunc) Validate(args []interface{}) error {
+	if len(args) < 2 {
+		return fmt.Errorf("tensorflow function must have at least 2 parameters but got %d", len(args))
+	}
+	return nil
+}
+
+func (f *Tffunc) IsAggregate() bool {
+	return false
+}
+
+func (f *Tffunc) Exec(args []interface{}, ctx api.FunctionContext) (interface{}, bool) {
+	model, ok := args[0].(string)
+	if !ok {
+		return fmt.Errorf("tensorflow function first parameter must be a string, but got %[1]T(%[1]v)", args[0]), false
+	}
+	interpreter, err := ipManager.GetOrCreate(model)
+	if err != nil {
+		return err, false
+	}
+	inputCount := interpreter.GetInputTensorCount()
+	if len(args)-1 != inputCount {
+		return fmt.Errorf("tensorflow function requires %d tensors but got %d", inputCount, len(args)-1), false
+	}
+
+	ctx.GetLogger().Warnf("tensorflow function %s with %d tensors", model, inputCount)
+	// Set input tensors
+	for i := 1; i < len(args); i++ {
+		input := interpreter.GetInputTensor(i - 1)
+		dims := "("
+		for j := 1; j < input.NumDims(); j++ {
+			dims += strconv.Itoa(input.Dim(j)) + ","
+		}
+		dims += ")"
+		ctx.GetLogger().Warnf("tensorflow function %s input %d shape %s", model, i, dims)
+		var arg []interface{}
+		switch v := args[i].(type) {
+		case []byte:
+			if int(input.ByteSize()) != len(v) {
+				return fmt.Errorf("tensorflow function input tensor %d has %d bytes but got %d", i-1, input.ByteSize(), len(v)), false
+			}
+			input.CopyFromBuffer(v)
+			continue
+		case []interface{}:
+			arg = v
+		default:
+			return fmt.Errorf("tensorflow function parameter %d must be a bytea or array of bytea, but got %[1]T(%[1]v)", i), false
+		}
+		t := input.Type()
+		ctx.GetLogger().Warnf("tensor %d input dims %d type %s", i-1, input.NumDims(), t)
+		for j := 0; j < input.NumDims(); j++ {
+			ctx.GetLogger().Warnf("tensor %d input dim %d %d", i-1, j, input.Dim(j))
+		}
+		switch input.NumDims() {
+		case 0, 1:
+			return fmt.Errorf("tensorflow function input tensor %d must have at least 2 dimensions but got 1", i-1), false
+		case 2:
+			if input.Dim(1) != len(arg) {
+				return fmt.Errorf("tensorflow function input tensor %d must have %d elements but got %d", i-1, input.Dim(1), len(arg)), false
+			}
+			switch t {
+			case tflite.Float32:
+				v, err := cast.ToFloat32Slice(arg, cast.CONVERT_SAMEKIND)
+				if err != nil {
+					return fmt.Errorf("invalid %d parameter, expect float32 but got %[2]T(%[2]v) with err %v", i, args[i], err), false
+				}
+				err = input.SetFloat32s(v)
+				if err != nil {
+					return nil, false
+				}
+			case tflite.Int64:
+				v, err := cast.ToInt64Slice(arg, cast.CONVERT_SAMEKIND)
+				if err != nil {
+					return fmt.Errorf("invalid %d parameter, expect int64 but got %[2]T(%[2]v) with err %v", i, args[i], err), false
+				}
+				err = input.SetInt64s(v)
+				if err != nil {
+					return nil, false
+				}
+			case tflite.Int32:
+				v, err := cast.ToTypedSlice(args, func(input interface{}, sn cast.Strictness) (interface{}, error) {
+					return cast.ToInt32(input, sn)
+				}, "int32", cast.CONVERT_SAMEKIND)
+				if err != nil {
+					return fmt.Errorf("invalid %d parameter, expect int32 but got %[2]T(%[2]v) with err %v", i, args[i], err), false
+				}
+				err = input.SetInt32s(v.([]int32))
+				if err != nil {
+					return nil, false
+				}
+			case tflite.Int16:
+				v, err := cast.ToTypedSlice(args, func(input interface{}, sn cast.Strictness) (interface{}, error) {
+					return cast.ToInt16(input, sn)
+				}, "int16", cast.CONVERT_SAMEKIND)
+				if err != nil {
+					return fmt.Errorf("invalid %d parameter, expect int16 but got %[2]T(%[2]v) with err %v", i, args[i], err), false
+				}
+				err = input.SetInt16s(v.([]int16))
+				if err != nil {
+					return nil, false
+				}
+			case tflite.Int8:
+				v, err := cast.ToTypedSlice(args, func(input interface{}, sn cast.Strictness) (interface{}, error) {
+					return cast.ToInt8(input, sn)
+				}, "int8", cast.CONVERT_SAMEKIND)
+				if err != nil {
+					return fmt.Errorf("invalid %d parameter, expect int8 but got %[2]T(%[2]v) with err %v", i, args[i], err), false
+				}
+				err = input.SetInt8s(v.([]int8))
+				if err != nil {
+					return nil, false
+				}
+			case tflite.UInt8:
+				v, err := cast.ToBytes(args, cast.CONVERT_SAMEKIND)
+				if err != nil {
+					return fmt.Errorf("invalid %d parameter, expect uint8 but got %[2]T(%[2]v) with err %v", i, args[i], err), false
+				}
+				err = input.SetUint8s(v)
+				if err != nil {
+					return nil, false
+				}
+			default:
+				return fmt.Errorf("invalid %d parameter, unsupported type %v in the model", i, t), false
+			}
+		default:
+			// TODO support multiple dimensions. Here assume user passes a 1D array.
+			//if input.Dim(1)*input.Dim(2) != len(arg) {
+			//	return fmt.Errorf("tensorflow function input tensor %d must have %d elements but got %d", i-1, input.Dim(1), len(arg)), false
+			//}
+			switch t {
+			case tflite.Float32:
+				v, err := cast.ToFloat32Slice(args[i], cast.CONVERT_SAMEKIND)
+				if err != nil {
+					return fmt.Errorf("invalid %d parameter, expect float32 but got %[2]T(%[2]v)", i, args[i]), false
+				}
+				err = input.SetFloat32s(v)
+				if err != nil {
+					return nil, false
+				}
+			case tflite.Int64:
+				v, err := cast.ToInt64Slice(arg, cast.CONVERT_SAMEKIND)
+				if err != nil {
+					return fmt.Errorf("invalid %d parameter, expect int64 but got %[2]T(%[2]v) with err %v", i, args[i], err), false
+				}
+				err = input.SetInt64s(v)
+				if err != nil {
+					return nil, false
+				}
+			case tflite.Int32:
+				v, err := cast.ToTypedSlice(args, func(input interface{}, sn cast.Strictness) (interface{}, error) {
+					return cast.ToInt32(input, sn)
+				}, "int32", cast.CONVERT_SAMEKIND)
+				if err != nil {
+					return fmt.Errorf("invalid %d parameter, expect int32 but got %[2]T(%[2]v) with err %v", i, args[i], err), false
+				}
+				err = input.SetInt32s(v.([]int32))
+				if err != nil {
+					return nil, false
+				}
+			case tflite.Int16:
+				v, err := cast.ToTypedSlice(args, func(input interface{}, sn cast.Strictness) (interface{}, error) {
+					return cast.ToInt16(input, sn)
+				}, "int16", cast.CONVERT_SAMEKIND)
+				if err != nil {
+					return fmt.Errorf("invalid %d parameter, expect int16 but got %[2]T(%[2]v) with err %v", i, args[i], err), false
+				}
+				err = input.SetInt16s(v.([]int16))
+				if err != nil {
+					return nil, false
+				}
+			case tflite.Int8:
+				v, err := cast.ToTypedSlice(args, func(input interface{}, sn cast.Strictness) (interface{}, error) {
+					return cast.ToInt8(input, sn)
+				}, "int8", cast.CONVERT_SAMEKIND)
+				if err != nil {
+					return fmt.Errorf("invalid %d parameter, expect int8 but got %[2]T(%[2]v) with err %v", i, args[i], err), false
+				}
+				err = input.SetInt8s(v.([]int8))
+				if err != nil {
+					return nil, false
+				}
+			case tflite.UInt8:
+				v, err := cast.ToBytes(args, cast.CONVERT_SAMEKIND)
+				if err != nil {
+					return fmt.Errorf("invalid %d parameter, expect uint8 but got %[2]T(%[2]v) with err %v", i, args[i], err), false
+				}
+				err = input.SetUint8s(v)
+				if err != nil {
+					return nil, false
+				}
+			default:
+				return fmt.Errorf("invalid %d parameter, unsupported type %v in the model", i, t), false
+			}
+		}
+	}
+	status := interpreter.Invoke()
+	if status != tflite.OK {
+		return fmt.Errorf("invoke failed"), false
+	}
+	outputCount := interpreter.GetOutputTensorCount()
+	results := make([]interface{}, outputCount)
+	for i := 0; i < outputCount; i++ {
+		output := interpreter.GetOutputTensor(i)
+		//outputSize := output.Dim(output.NumDims() - 1)
+		//b := make([]byte, outputSize)
+		//status = output.CopyToBuffer(&b[0])
+		//if status != tflite.OK {
+		//	return fmt.Errorf("output failed"), false
+		//}
+		//results[i] = b
+		t := output.Type()
+		switch t {
+		case tflite.Float32:
+			results[i] = output.Float32s()
+		case tflite.Int64:
+			results[i] = output.Int64s()
+		case tflite.Int32:
+			results[i] = output.Int32s()
+		case tflite.Int16:
+			results[i] = output.Int16s()
+		case tflite.Int8:
+			results[i] = output.Int8s()
+		case tflite.UInt8:
+			results[i] = output.UInt8s()
+		default:
+			return fmt.Errorf("invalid %d parameter, unsupported type %v in the model", i, t), false
+		}
+	}
+	return results, true
+}
+
+var TfLite Tffunc

+ 91 - 0
extensions/functions/tfLite/tfLite.json

@@ -0,0 +1,91 @@
+{
+	"about": {
+		"trial": false,
+		"author": {
+			"name": "EMQ",
+			"email": "contact@emqx.io",
+			"company": "EMQ Technologies Co., Ltd",
+			"website": "https://www.emqx.io"
+		},
+		"helpUrl": {
+			"en_US": "https://github.com/lf-edge/ekuiper/blob/master/docs/en_US/sqls/custom_functions.md",
+			"zh_CN": "https://github.com/lf-edge/ekuiper/blob/master/docs/zh_CN/sqls/custom_functions.md"
+		},
+		"description": {
+			"en_US": "",
+			"zh_CN": ""
+		}
+	},
+	"libs": [],
+	"name": "tfLite",
+	"functions": [{
+		"name": "tflite",
+		"example": "tflite(model,para1, para2,...)",
+		"hint": {
+			"en_US": "Creates a scaled image with new dimensions (width, height) .If either width or height is set to 0, it will be set to an aspect ratio preserving value.",
+			"zh_CN": "创建具有新尺寸(宽度,高度)的缩放图像。如果width或height设置为0,则将其设置为长宽比保留值。"
+		},
+		"args": [
+			{
+				"name": "model",
+				"hidden": true,
+				"optional": false,
+				"control": "text",
+				"type": "string",
+				"hint": {
+					"en_US": "Input data",
+					"zh_CN": "输入模型"
+				},
+				"label": {
+					"en_US": "Input data",
+					"zh_CN": "输入模型"
+				}
+			},{
+				"name": "para1",
+				"hidden": true,
+				"optional": false,
+				"control": "text",
+				"type": "int",
+				"hint": {
+					"en_US": "Input data",
+					"zh_CN": "输入数据1"
+				},
+				"label": {
+					"en_US": "Input data",
+					"zh_CN": "输入数据1"
+				}
+			},{
+				"name": "para2",
+				"hidden": true,
+				"optional": false,
+				"control": "text",
+				"type": "int",
+				"hint": {
+					"en_US": "Input height",
+					"zh_CN": "输入数据2"
+				},
+				"label": {
+					"en_US": "Input height",
+					"zh_CN": "输入数据2"
+				}
+			}
+		],
+		"outputs": [
+			{
+				"label": {
+					"en_US": "Output",
+					"zh_CN": "输出"
+				},
+				"value": "tflite"
+			}
+		],
+		"node": {
+			"category": "abc",
+			"icon": "iconPath",
+			"label": {
+				"en_US": "tflite",
+				"zh_CN": "tflite"
+			}
+		}
+	}]
+}

+ 2 - 2
internal/server/plugin_init.go

@@ -234,9 +234,9 @@ func prebuildPluginsHandler(w http.ResponseWriter, _ *http.Request, t plugin.Plu
 	}
 }
 
-var NativeSourcePlugin = []string{"random", "zmq", "sql"}
+var NativeSourcePlugin = []string{"random", "zmq", "sql", "video"}
 var NativeSinkPlugin = []string{"file", "image", "influx", "tdengine", "zmq", "sql"}
-var NativeFunctionPlugin = []string{"accumulateWordCount", "countPlusOne", "echo", "geohash", "image", "labelImage"}
+var NativeFunctionPlugin = []string{"accumulateWordCount", "countPlusOne", "echo", "geohash", "image", "labelImage", "tfLite"}
 
 func fetchPluginList(t plugin.PluginType, hosts, os, arch string) (err error, result map[string]string) {
 	ptype := "sources"

+ 16 - 0
pkg/cast/cast.go

@@ -897,6 +897,22 @@ func ToFloat64Slice(input interface{}, sn Strictness) ([]float64, error) {
 	return result, nil
 }
 
+func ToFloat32Slice(input interface{}, sn Strictness) ([]float32, error) {
+	s := reflect.ValueOf(input)
+	if s.Kind() != reflect.Slice {
+		return nil, fmt.Errorf("cannot convert %[1]T(%[1]v) to float slice)", input)
+	}
+	var result []float32
+	for i := 0; i < s.Len(); i++ {
+		ele, err := ToFloat32(s.Index(i).Interface(), sn)
+		if err != nil {
+			return nil, fmt.Errorf("cannot convert %[1]T(%[1]v) to float slice for the %d element: %v", input, i, err)
+		}
+		result = append(result, ele)
+	}
+	return result, nil
+}
+
 func ToBoolSlice(input interface{}, sn Strictness) ([]bool, error) {
 	s := reflect.ValueOf(input)
 	if s.Kind() != reflect.Slice {