Bladeren bron

taos install (#547)

* feat():taos install script

* feat():add dir to the plugin

* feat():exit status code

* feat():add log when exec install.sh

* feat():add timeout

* feat():wget --read-timeout=240

* feat():Adjust the directory of the compiled plugin

* bug():Delete unzipped files when running script fails

* feat():get wget info

* feat():del wget log

* feat():add err log

* feat():emq.go->emq/emq.go

* chore(CI): update build plugins (#1)

Co-authored-by: Rory Z <Rory-Z@outlook.com>
EMQmyd 4 jaren geleden
bovenliggende
commit
b79e90e5cd

+ 5 - 5
.ci/Dockerfile-plugins

@@ -15,20 +15,20 @@ RUN set -e -u -x \
     && for lib in $(cat etc/$PLUGIN_TYPE/$PLUGIN_NAME.json | jq -r ".libs[]"); do go get $lib; done \
     && case $PLUGIN_NAME in \
          influxdb ) \
-           go build --buildmode=plugin -tags plugins -o _plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME.go \
+           go build --buildmode=plugin -tags plugins -o plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME.go \
            ;; \
          taos ) \
            if [ "$(uname -m)" = "x86_64" ]; then \
              wget "https://www.taosdata.com/download/download-all.php?pkgType=tdengine_linux&pkgName=TDengine-client-2.0.3.1-Linux-x64.tar.gz" -O /tmp/TDengine-client-2.0.3.1-Linux-x64.tar.gz \
              && tar -zxvf /tmp/TDengine-client-2.0.3.1-Linux-x64.tar.gz \
              && cd TDengine-client && ./install_client.sh && cd - \
-             && go build --buildmode=plugin -tags plugins -o _plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME.go; \
+             && go build --buildmode=plugin -tags plugins -o plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME.go; \
            fi \
            ;; \
          * ) \
-           go build --buildmode=plugin -o _plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME.go \
+           go build --buildmode=plugin -o plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME.go \
            ;; \
        esac \
-    && if [ -f "etc/$PLUGIN_TYPE/$PLUGIN_NAME.yaml" ]; then cp etc/$PLUGIN_TYPE/$PLUGIN_NAME.yaml _plugins/$PLUGIN_TYPE/$PLUGIN_NAME; fi \
-    && cd _plugins/$PLUGIN_TYPE/$PLUGIN_NAME \
+    && if [ -f "etc/$PLUGIN_TYPE/$PLUGIN_NAME.yaml" ]; then cp etc/$PLUGIN_TYPE/$PLUGIN_NAME.yaml plugins/$PLUGIN_TYPE/$PLUGIN_NAME; fi \
+    && cd plugins/$PLUGIN_TYPE/$PLUGIN_NAME \
     && zip -r ${PLUGIN_NAME}_$(go version | grep -o "linux/.*" | sed -r 's linux/(.*) \1 g').zip .

+ 0 - 19
.ci/Dockerfile-plugins-alpine

@@ -1,19 +0,0 @@
-FROM golang:1.15.1-alpine AS builder
-
-ARG VERSION
-ARG PLUGIN_TYPE
-ARG PLUGIN_NAME
-
-COPY . /go/kuiper
-
-WORKDIR /go/kuiper
-
-RUN apk add gcc make git libc-dev binutils-gold pkgconfig zeromq-dev jq zip
-
-RUN set -e -u -x \
-    && mkdir -p _plugins/$PLUGIN_TYPE/$PLUGIN_NAME \
-    && for lib in $(cat etc/$PLUGIN_TYPE/$PLUGIN_NAME.json | jq -r ".libs[]"); do go get $lib; done \
-    && go build --buildmode=plugin -o _plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME.go \
-    && if [ -f "etc/$PLUGIN_TYPE/$PLUGIN_NAME.yaml" ]; then cp etc/$PLUGIN_TYPE/$PLUGIN_NAME.yaml _plugins/$PLUGIN_TYPE/$PLUGIN_NAME; fi \
-    && cd _plugins/$PLUGIN_TYPE/$PLUGIN_NAME \
-    && zip -r ${PLUGIN_NAME}_$(go version | grep -o "linux/.*" | sed -r 's linux/(.*) \1 g').zip .

+ 16 - 5
.github/workflows/build_packages.yaml

@@ -1,10 +1,10 @@
 name: Build packages
 
 on:
-    pull_request:
-    release:
-        types:
-            - published
+  pull_request:
+  release:
+    types:
+      - published
 
 jobs:
     build:
@@ -117,7 +117,18 @@ jobs:
                 sleep 5
                 if ! curl ${ip_address}:9081  >/dev/null 2>&1; then echo "docker image failed"; exit 1; fi
                 if [ ${os} = alpine ]; then continue; fi
-                curl ${ip_address}:9081/plugins/${plugin_type} -X POST -d "{\"name\":\"${plugin_name}\", \"file\":\"file:///var/plugins/${os}/${plugin_type}/${plugin_name}_amd64.zip\"}"
+                if [ "${plugin_name}" = "taos" ]; then
+                    curl \
+                    ${ip_address}:9081/plugins/${plugin_type} \
+                    -X POST \
+                    -d "{\"name\":\"${plugin_name}\", \"file\":\"file:///var/plugins/${os}/${plugin_type}/${plugin_name}_amd64.zip\", \"shellParas\": [\"2.0.3.1\"]}"
+                else
+                    curl \
+                    ${ip_address}:9081/plugins/${plugin_type} \
+                    -X POST \
+                    -d "{\"name\":\"${plugin_name}\", \"file\":\"file:///var/plugins/${os}/${plugin_type}/${plugin_name}_amd64.zip\"}"
+                fi
+                docker logs ${container_id}
                 [ $plugin_name = $(curl ${ip_address}:9081/plugins/${plugin_type}/${plugin_name} | jq '.name'| sed 's/\"//g' ) ] || exit 1
             done
         - uses: actions/upload-artifact@v2

+ 2 - 2
.github/workflows/run_fvt_tests.yaml

@@ -42,7 +42,7 @@ jobs:
           run: |
             sudo apt update && sudo apt install pkg-config libczmq-dev -y
             make build_with_edgex
-            go build --buildmode=plugin -o plugins/sources/Zmq.so plugins/sources/zmq.go
+            go build --buildmode=plugin -o plugins/sources/Zmq.so plugins/sources/zmq/zmq.go
         - name: run edgex && emqx && kuiper
           run: |
             sudo ./fvt_scripts/setup_env.sh
@@ -200,4 +200,4 @@ jobs:
               echo -e "---------------------------------------------\n"
               echo "FVT tests error"
               exit 1
-          fi
+          fi

+ 5 - 5
.github/workflows/run_test_case.yaml

@@ -31,11 +31,11 @@ jobs:
             mkdir -p data
             mkdir -p log
             sed -i -r "s/debug: .*/debug: true/1" etc/kuiper.yaml
-            go build --buildmode=plugin -o plugins/sources/Random@v2.0.0.so plugins/sources/random.go
-            go build --buildmode=plugin -o plugins/sinks/File@v1.0.0.so plugins/sinks/file.go
-            go build --buildmode=plugin -o plugins/functions/Echo.so plugins/functions/echo.go
-            go build --buildmode=plugin -o plugins/functions/CountPlusOne@v1.0.0.so plugins/functions/countPlusOne.go
-            go build --buildmode=plugin -o plugins/functions/AccumulateWordCount@v1.0.0.so plugins/functions/accumulateWordCount.go
+            go build --buildmode=plugin -o plugins/sources/Random@v2.0.0.so plugins/sources/random/random.go
+            go build --buildmode=plugin -o plugins/sinks/File@v1.0.0.so plugins/sinks/file/file.go
+            go build --buildmode=plugin -o plugins/functions/Echo.so plugins/functions/echo/echo.go
+            go build --buildmode=plugin -o plugins/functions/CountPlusOne@v1.0.0.so plugins/functions/countPlusOne/countPlusOne.go
+            go build --buildmode=plugin -o plugins/functions/AccumulateWordCount@v1.0.0.so plugins/functions/accumulateWordCount/accumulateWordCount.go
             go test --tags=edgex ./...
         - uses: actions/upload-artifact@v1
           if: failure()

+ 4 - 4
Makefile

@@ -191,8 +191,8 @@ sinks/taos:
     -f .ci/Dockerfile-plugins .
 
 	@mkdir -p _plugins/debian/sinks
-	@tar -xvf /tmp/cross_build_plugins_sinks_taos.tar --wildcards "go/kuiper/_plugins/sinks/taos/taos_amd64.zip" \
-	&& mv go/kuiper/_plugins/sinks/taos/taos_amd64.zip _plugins/debian/sinks
+	@tar -xvf /tmp/cross_build_plugins_sinks_taos.tar --wildcards "go/kuiper/plugins/sinks/taos/taos_amd64.zip" \
+	&& mv go/kuiper/plugins/sinks/taos/taos_amd64.zip _plugins/debian/sinks
 	@rm -f /tmp/cross_build_plugins_sinks_taos.tar
 $(PLUGINS): PLUGIN_TYPE = $(word 1, $(subst /, , $@))
 $(PLUGINS): PLUGIN_NAME = $(word 2, $(subst /, , $@))
@@ -208,8 +208,8 @@ $(PLUGINS):
 
 	@mkdir -p _plugins/debian/$(PLUGIN_TYPE)
 	@for arch in amd64 arm64 arm_v7 386 ppc64le; do \
-		tar -xvf /tmp/cross_build_plugins_$(PLUGIN_TYPE)_$(PLUGIN_NAME).tar --wildcards "linux_$${arch}/go/kuiper/_plugins/$(PLUGIN_TYPE)/$(PLUGIN_NAME)/$(PLUGIN_NAME)_$$(echo $${arch%%_*}).zip" \
-		&& mv $$(ls linux_$${arch}/go/kuiper/_plugins/$(PLUGIN_TYPE)/$(PLUGIN_NAME)/$(PLUGIN_NAME)_$$(echo $${arch%%_*}).zip) _plugins/debian/$(PLUGIN_TYPE); \
+		tar -xvf /tmp/cross_build_plugins_$(PLUGIN_TYPE)_$(PLUGIN_NAME).tar --wildcards "linux_$${arch}/go/kuiper/plugins/$(PLUGIN_TYPE)/$(PLUGIN_NAME)/$(PLUGIN_NAME)_$$(echo $${arch%%_*}).zip" \
+		&& mv $$(ls linux_$${arch}/go/kuiper/plugins/$(PLUGIN_TYPE)/$(PLUGIN_NAME)/$(PLUGIN_NAME)_$$(echo $${arch%%_*}).zip) _plugins/debian/$(PLUGIN_TYPE); \
 	done
 	@rm -f /tmp/cross_build_plugins_$(PLUGIN_TYPE)_$(PLUGIN_NAME).tar
 

+ 1 - 1
deploy/docker/Dockerfile-slim

@@ -15,7 +15,7 @@ COPY ./deploy/docker/docker-entrypoint.sh /usr/bin/docker-entrypoint.sh
 COPY --from=builder /go/kuiper/kuiper_conf_util /usr/bin/kuiper_conf_util
 COPY --from=builder /go/kuiper/_build/kuiper-* /kuiper/
 
-RUN apt update && apt install -y pkg-config libczmq-dev
+RUN apt update && apt install -y pkg-config libczmq-dev wget
 
 WORKDIR /kuiper
 

+ 2 - 1
docs/en_US/plugins/sinks/taos.md

@@ -19,7 +19,8 @@ require (
 go mod edit -replace github.com/emqx/kuiper=/$kuiper
 go build --buildmode=plugin -o /$kuiper/plugins/sinks/Taos@v1.0.0.so /$kuiper/plugins/sinks/taos.go
 ```
-
+### Install plugin
+Since the operation of the taos plug-in depends on the taos client, for the convenience of users, the taos client will be downloaded when the plug-in is installed. However, the taos client version corresponds to the server version one-to-one, which is not compatible with each other, so the user must inform the taos server version used.
 ## Rule Actions Description
 
 As the taos database requires a time stamp field in the table, the user must inform the time stamp field name of the data table (required tsFieldName). The user can choose whether to provide time stamp data. If not (provideTs=false), the content of the time stamp field is automatically generated by the taos database.

+ 2 - 1
docs/zh_CN/plugins/sinks/taos.md

@@ -19,7 +19,8 @@ require (
 go mod edit -replace github.com/emqx/kuiper=/$kuiper
 go build --buildmode=plugin -o /$kuiper/plugins/sinks/Taos@v1.0.0.so /$kuiper/plugins/sinks/taos.go
 ```
-
+### 安装插件
+由于 taos 插件的运行依赖于 taos 客户端,为了便于用户使用,安装插件时将下载 taos 客户端。但是 taos 客户端版本与其服务器版本一一对应,互不兼容,所以用户必须告知所用 taos 服务器版本。
 ## 规则 Actions 说明
 
 由于 taos 数据库要求表中必须有时间戳字段,所以用户必须告知数据表的时间戳字段名称(必填tsFieldName)。用户可以选择是否提供时间戳数据,若不提供(provideTs=false),时间戳字段的内容由 taos 数据库自动生成。

+ 2 - 2
fvt_scripts/prepare_plugins.sh

@@ -16,7 +16,7 @@ if [ -f "$FILE" ]; then
     echo "$FILE exists, not requried to build plugin."
 else
     echo "$FILE does not exist, will build the plugin."
-    go build --buildmode=plugin -o ../plugins/sources/Zmq.so ../plugins/sources/zmq.go
+    go build --buildmode=plugin -o ../plugins/sources/Zmq.so ../plugins/sources/zmq/zmq.go
 fi
 
 mv ../plugins/sources/Zmq.so .
@@ -32,4 +32,4 @@ cd plugins/service/
 export BUILD_ID=dontKillMe
 
 echo "starting mock http server..."
-nohup ./http_server > http_server.out 2>&1 &
+nohup ./http_server > http_server.out 2>&1 &

plugins/functions/accumulateWordCount.go → plugins/functions/accumulateWordCount/accumulateWordCount.go


plugins/functions/countPlusOne.go → plugins/functions/countPlusOne/countPlusOne.go


plugins/functions/echo.go → plugins/functions/echo/echo.go


+ 30 - 8
plugins/manager.go

@@ -2,6 +2,7 @@ package plugins
 
 import (
 	"archive/zip"
+	"bytes"
 	"crypto/tls"
 	"errors"
 	"fmt"
@@ -24,8 +25,9 @@ import (
 )
 
 type Plugin struct {
-	Name string `json:"name"`
-	File string `json:"file"`
+	Name       string   `json:"name"`
+	File       string   `json:"file"`
+	ShellParas []string `json:"shellParas"`
 }
 
 type PluginType int
@@ -247,7 +249,7 @@ func (m *Manager) List(t PluginType) (result []string, err error) {
 }
 
 func (m *Manager) Register(t PluginType, j *Plugin) error {
-	name, uri := j.Name, j.File
+	name, uri, shellParas := j.Name, j.File, j.ShellParas
 	//Validation
 	name = strings.Trim(name, " ")
 	if name == "" {
@@ -275,7 +277,7 @@ func (m *Manager) Register(t PluginType, j *Plugin) error {
 		return fmt.Errorf("fail to download file %s: %s", uri, err)
 	}
 	//unzip and copy to destination
-	unzipFiles, version, err := m.install(t, name, zipPath)
+	unzipFiles, version, err := m.install(t, name, zipPath, shellParas)
 	if err != nil {
 		if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
 			os.Remove(unzipFiles[0])
@@ -385,7 +387,7 @@ func getSoFilePath(m *Manager, t PluginType, name string) (string, error) {
 	return p, nil
 }
 
-func (m *Manager) install(t PluginType, name string, src string) ([]string, string, error) {
+func (m *Manager) install(t PluginType, name, src string, shellParas []string) ([]string, string, error) {
 	var filenames []string
 	var tempPath = path.Join(m.pluginDir, "temp", PluginTypes[t], name)
 	defer os.RemoveAll(tempPath)
@@ -403,6 +405,7 @@ func (m *Manager) install(t PluginType, name string, src string) ([]string, stri
 		yamlPath = path.Join(m.etcDir, PluginTypes[t], yamlFile)
 		expFiles = 2
 	}
+	var revokeFiles []string
 	needInstall := false
 	for _, file := range r.File {
 		fileName := file.Name
@@ -411,10 +414,14 @@ func (m *Manager) install(t PluginType, name string, src string) ([]string, stri
 			if err != nil {
 				return filenames, "", err
 			}
+			revokeFiles = append(revokeFiles, yamlPath)
 			filenames = append(filenames, yamlPath)
 		} else if fileName == name+".json" {
-			if err := unzipTo(file, path.Join(m.etcDir, PluginTypes[t], fileName)); nil != err {
+			jsonPath := path.Join(m.etcDir, PluginTypes[t], fileName)
+			if err := unzipTo(file, jsonPath); nil != err {
 				common.Log.Errorf("Failed to decompress the metadata %s file", fileName)
+			} else {
+				revokeFiles = append(revokeFiles, jsonPath)
 			}
 		} else if soPrefix.Match([]byte(fileName)) {
 			soPath := path.Join(m.pluginDir, PluginTypes[t], fileName)
@@ -423,6 +430,7 @@ func (m *Manager) install(t PluginType, name string, src string) ([]string, stri
 				return filenames, "", err
 			}
 			filenames = append(filenames, soPath)
+			revokeFiles = append(revokeFiles, soPath)
 			_, version = parseName(fileName)
 		} else { //unzip other files
 			err = unzipTo(file, path.Join(tempPath, fileName))
@@ -439,11 +447,25 @@ func (m *Manager) install(t PluginType, name string, src string) ([]string, stri
 	} else if needInstall {
 		//run install script if there is
 		spath := path.Join(tempPath, "install.sh")
-		out, err := exec.Command("/bin/sh", spath).Output()
+		shellParas = append(shellParas, spath)
+		if 1 != len(shellParas) {
+			copy(shellParas[1:], shellParas[0:])
+			shellParas[0] = spath
+		}
+		cmd := exec.Command("/bin/sh", shellParas...)
+		var outb, errb bytes.Buffer
+		cmd.Stdout = &outb
+		cmd.Stderr = &errb
+		err := cmd.Run()
+
 		if err != nil {
+			for _, f := range revokeFiles {
+				os.Remove(f)
+			}
+			common.Log.Infof(`err:%v stdout:%s stderr:%s`, err, outb.String(), errb.String())
 			return filenames, "", err
 		} else {
-			common.Log.Infof("install %s plugin %s log: %s", PluginTypes[t], name, out)
+			common.Log.Infof("install %s plugin %s", PluginTypes[t], name)
 		}
 	}
 	return filenames, version, nil

plugins/sinks/file.go → plugins/sinks/file/file.go


plugins/sinks/influx.go → plugins/sinks/influx/influx.go


plugins/sinks/memory.go → plugins/sinks/memory/memory.go


+ 48 - 0
plugins/sinks/taos/install.sh

@@ -0,0 +1,48 @@
+#!/bin/sh
+set -e -x -u
+
+if [ -z "$1" ]
+then
+    echo "version is empty."
+	exit 5
+fi
+
+url="https://www.taosdata.com/download/download-all.php?pkgType=tdengine_linux&pkgName=TDengine-client-$1-Linux-x64.tar.gz"
+zip="TDengine-client.tar.gz"
+wget -T 280 -O "$zip" "$url"
+
+if ! [ -e $zip ]
+then
+	echo "Not downloaded to the installation package."
+	exit 2
+fi
+
+dir="TDengine-client"
+tar -zxvf "$zip"
+rm "$zip"
+
+if ! [ -e $dir ]
+then
+	echo "Failed to decompress Taos client."
+	exit 3
+fi
+
+cd "$dir"
+ret=""
+for file in ./*
+do
+	if [ -x $file -a ! -d $file ]
+	then
+		./"$file"
+		ret="successful"
+	fi
+done
+
+cd ../
+rm -rf "$dir"
+
+if [ -z "$ret" ]
+then
+    echo "not found script."
+	exit 4
+fi

plugins/sinks/taos.go → plugins/sinks/taos/taos.go


plugins/sinks/zmq.go → plugins/sinks/zmq/zmq.go


plugins/sources/random.go → plugins/sources/random/random.go


plugins/sources/zmq.go → plugins/sources/zmq/zmq.go


+ 2 - 2
xstream/server/server/rest.go

@@ -112,8 +112,8 @@ func createRestServer(port int) *http.Server {
 	server := &http.Server{
 		Addr: fmt.Sprintf("0.0.0.0:%d", port),
 		// Good practice to set timeouts to avoid Slowloris attacks.
-		WriteTimeout: time.Second * 15,
-		ReadTimeout:  time.Second * 15,
+		WriteTimeout: time.Second * 60 * 5,
+		ReadTimeout:  time.Second * 60 * 5,
 		IdleTimeout:  time.Second * 60,
 		Handler:      handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin"}))(r),
 	}