Forráskód Böngészése

feat: get available plugins

RockyJin 4 éve
szülő
commit
eccd0c615c

+ 135 - 0
common/os_util.go

@@ -0,0 +1,135 @@
+package common
+
+import (
+	"bufio"
+	"bytes"
+	"errors"
+	"os"
+	"strings"
+)
+
+const EtcOsRelease string = "/etc/os-release"
+const UsrLibOsRelease string = "/usr/lib/os-release"
+
+// Read and return os-release, trying EtcOsRelease, followed by UsrLibOsRelease.
+// err will contain an error message if neither file exists or failed to parse
+func Read() (osrelease map[string]string, err error) {
+	osrelease, err = ReadFile(EtcOsRelease)
+	if err != nil {
+		osrelease, err = ReadFile(UsrLibOsRelease)
+	}
+	return
+}
+
+// Similar to Read(), but takes the name of a file to load instead
+func ReadFile(filename string) (osrelease map[string]string, err error) {
+	osrelease = make(map[string]string)
+	err = nil
+
+	lines, err := parseFile(filename)
+	if err != nil {
+		return
+	}
+
+	for _, v := range lines {
+		key, value, err := parseLine(v)
+		if err == nil {
+			osrelease[key] = value
+		}
+	}
+	return
+}
+
+// ReadString is similar to Read(), but takes a string to load instead
+func ReadString(content string) (osrelease map[string]string, err error) {
+	osrelease = make(map[string]string)
+	err = nil
+
+	lines, err := parseString(content)
+	if err != nil {
+		return
+	}
+
+	for _, v := range lines {
+		key, value, err := parseLine(v)
+		if err == nil {
+			osrelease[key] = value
+		}
+	}
+	return
+}
+
+func parseFile(filename string) (lines []string, err error) {
+	file, err := os.Open(filename)
+	if err != nil {
+		return nil, err
+	}
+	defer file.Close()
+
+	scanner := bufio.NewScanner(file)
+	for scanner.Scan() {
+		lines = append(lines, scanner.Text())
+	}
+	return lines, scanner.Err()
+}
+
+func parseString(content string) (lines []string, err error) {
+	in := bytes.NewBufferString(content)
+	reader := bufio.NewReader(in)
+	scanner := bufio.NewScanner(reader)
+
+	for scanner.Scan() {
+		lines = append(lines, scanner.Text())
+	}
+	return lines, scanner.Err()
+
+}
+
+func parseLine(line string) (key string, value string, err error) {
+	err = nil
+
+	// skip empty lines
+	if len(line) == 0 {
+		err = errors.New("Skipping: zero-length")
+		return
+	}
+
+	// skip comments
+	if line[0] == '#' {
+		err = errors.New("Skipping: comment")
+		return
+	}
+
+	// try to split string at the first '='
+	splitString := strings.SplitN(line, "=", 2)
+	if len(splitString) != 2 {
+		err = errors.New("Can not extract key=value")
+		return
+	}
+
+	// trim white space from key and value
+	key = splitString[0]
+	key = strings.Trim(key, " ")
+	value = splitString[1]
+	value = strings.Trim(value, " ")
+
+	// Handle double quotes
+	if strings.ContainsAny(value, `"`) {
+		first := string(value[0:1])
+		last := string(value[len(value)-1:])
+
+		if first == last && strings.ContainsAny(first, `"'`) {
+			value = strings.TrimPrefix(value, `'`)
+			value = strings.TrimPrefix(value, `"`)
+			value = strings.TrimSuffix(value, `'`)
+			value = strings.TrimSuffix(value, `"`)
+		}
+	}
+
+	// expand anything else that could be escaped
+	value = strings.Replace(value, `\"`, `"`, -1)
+	value = strings.Replace(value, `\$`, `$`, -1)
+	value = strings.Replace(value, `\\`, `\`, -1)
+	value = strings.Replace(value, "\\`", "`", -1)
+	return
+}

+ 1 - 0
common/util.go

@@ -67,6 +67,7 @@ type KuiperConf struct {
 		RestTls        *tlsConf `yaml:"restTls"`
 		Prometheus     bool     `yaml:"prometheus"`
 		PrometheusPort int      `yaml:"prometheusPort"`
+		PluginHosts    string   `yaml:pluginHosts`
 	}
 	Rule api.RuleOption
 	Sink struct {

+ 52 - 1
docs/en_US/operation/configuration_file.md

@@ -47,7 +47,58 @@ basic:
   prometheus: true
   prometheusPort: 20499
 ```
-For such a default configuration, Kuiper will export metrics and serve prometheus at ``http://localhost:20499/metrics``
+For such a default configuration, Kuiper will export metrics and serve prometheus at `http://localhost:20499/metrics`
+
+## Pluginhosts
+
+The URL where hosts all of pre-build plugins. By default it's at `packages.emqx.io`. There could be several hosts (host can be separated with comma), if same package could be found in the several hosts, then the package in the 1st host will have the highest priority.
+
+```yaml
+pluginHosts: https://packages.emqx.io
+```
+
+It could be also as following, you can specify a local repository, and the plugin in that repository will have higher priorities.
+
+```yaml
+pluginHosts: https://local.repo.net, https://packages.emqx.io
+```
+
+The directory structure of the plugins should be similar as following.
+
+```
+http://host:port/kuiper-plugins/0.9.1/sinks/alpine
+```
+
+The content of the page should be similar as below.
+
+```html
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
+<html>
+<title>Directory listing for enterprise: /4.1.1/</title>
+<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
+<meta name="robots" content="noindex,nofollow">
+<body>
+	<h2>Directory listing for enterprise: /4.1.1/</h2>
+	<hr>
+	<ul>
+		<li><a href="file_386.zip">file_386.zip</a>
+		<li><a href="file_amd64.zip">file_amd64.zip</a>
+		<li><a href="file_arm.zip">file_arm.zip</a>
+		<li><a href="file_arm64.zip">file_arm64.zip</a>
+		<li><a href="file_ppc64le.zip">file_ppc64le.zip</a>
+
+		<li><a href="influx_386.zip">influx_386.zip</a>
+		<li><a href="influx_amd64.zip">influx_amd64.zip</a>
+		<li><a href="influx_arm.zip">influx_arm.zip</a>
+		<li><a href="influx_arm64.zip">influx_arm64.zip</a>
+		<li><a href="influx_ppc64le.zip">influx_ppc64le.zip</a>
+	</ul>
+	<hr>
+</body>
+</html>
+```
+
+
 
 ## Sink configurations
 

+ 20 - 0
docs/en_US/restapi/plugins.md

@@ -127,4 +127,24 @@ DELETE http://localhost:8080/plugins/functions/{name}
 The user can pass a query parameter to decide if Kuiper should be stopped after a delete in order to make the deletion take effect. The parameter is `restart` and only when the value is `1` will the Kuiper be stopped. The user has to manually restart it.
 ```shell
 DELETE http://localhost:8080/plugins/sources/{name}?restart=1
+```
+
+## Get the available plugins
+
+According to the configuration `pluginHosts` in file `etc/kuiper.yaml` ,  it returns the plugins list that can be installed at local run Kuiper instance. By default, it get the list from `https://packages.emqx.io` .
+
+```
+GET http://localhost:9081/plugins/sources/prebuild
+GET http://localhost:9081/plugins/sinks/prebuild
+GET http://localhost:9081/plugins/functions/prebuild
+```
+
+The sample result is as following, and the key is plugin name, the value is plugin download address.
+
+```json
+{
+  "file": "http://127.0.0.1:63767/kuiper-plugins/0.9.1/sinks/alpine/file_arm64.zip",
+  "influx": "http://127.0.0.1:63767/kuiper-plugins/0.9.1/sinks/alpine/influx_arm64.zip",
+  "zmq": "http://127.0.0.1:63768/kuiper-plugins/0.9.1/sinks/alpine/zmq_arm64.zip"
+}
 ```

+ 52 - 1
docs/zh_CN/operation/configuration_file.md

@@ -48,7 +48,58 @@ basic:
 ```
 在如上默认配置中,Kuiper 暴露于 Prometheusd 运行指标可通过 `http://localhost:20499/metrics` 访问。
 
-## Sink configurations
+## Pluginhosts 配置
+
+The URL where hosts all of pre-build plugins. By default it's at `packages.emqx.io`. There could be several hosts (host can be separated with comma), if same package could be found in the several hosts, then the package in the 1st host will have the highest priority.
+
+```yaml
+pluginHosts: https://packages.emqx.io
+```
+
+It could be also as following, you can specify a local repository, and the plugin in that repository will have higher priorities.
+
+```yaml
+pluginHosts: https://local.repo.net, https://packages.emqx.io
+```
+
+The directory structure of the plugins should be similar as following.
+
+```
+http://host:port/kuiper-plugins/0.9.1/sinks/alpine
+```
+
+The content of the page should be similar as below.
+
+```html
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
+<html>
+<title>Directory listing for enterprise: /4.1.1/</title>
+<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
+<meta name="robots" content="noindex,nofollow">
+<body>
+	<h2>Directory listing for enterprise: /4.1.1/</h2>
+	<hr>
+	<ul>
+		<li><a href="file_386.zip">file_386.zip</a>
+		<li><a href="file_amd64.zip">file_amd64.zip</a>
+		<li><a href="file_arm.zip">file_arm.zip</a>
+		<li><a href="file_arm64.zip">file_arm64.zip</a>
+		<li><a href="file_ppc64le.zip">file_ppc64le.zip</a>
+
+		<li><a href="influx_386.zip">influx_386.zip</a>
+		<li><a href="influx_amd64.zip">influx_amd64.zip</a>
+		<li><a href="influx_arm.zip">influx_arm.zip</a>
+		<li><a href="influx_arm64.zip">influx_arm64.zip</a>
+		<li><a href="influx_ppc64le.zip">influx_ppc64le.zip</a>
+	</ul>
+	<hr>
+</body>
+</html>
+```
+
+
+
+## Sink 配置
 
 ```yaml
   #The cache persistence threshold size. If the message in sink cache is larger than 10, then it triggers persistence. If you find the remote system is slow to response, or sink throughput is small, then it's recommend to increase below 2 configurations.More memory is required with the increase of below 2 configurations.

+ 20 - 0
docs/zh_CN/restapi/plugins.md

@@ -129,4 +129,24 @@ DELETE http://localhost:8080/plugins/functions/{name}
 
 ```shell
 DELETE http://localhost:8080/plugins/sources/{name}?restart=1
+```
+
+## 获取可安装的插件
+
+根据在 `etc/kuiper.yaml` 文件中 `pluginHosts` 的配置,获取适合本 Kuiper 实例运行的插件列表,缺省会从 `https://packages.emqx.io` 上去获取。
+
+```
+GET http://localhost:9081/plugins/sources/prebuild
+GET http://localhost:9081/plugins/sinks/prebuild
+GET http://localhost:9081/plugins/functions/prebuild
+```
+
+样例返回内容如下,其中键值为插件名称,值是插件的下载地址。
+
+```json
+{
+  "file": "http://127.0.0.1:63767/kuiper-plugins/0.9.1/sinks/alpine/file_arm64.zip",
+  "influx": "http://127.0.0.1:63767/kuiper-plugins/0.9.1/sinks/alpine/influx_arm64.zip",
+  "zmq": "http://127.0.0.1:63768/kuiper-plugins/0.9.1/sinks/alpine/zmq_arm64.zip"
+}
 ```

+ 4 - 0
etc/kuiper.yaml

@@ -15,6 +15,10 @@ basic:
   # Prometheus settings
   prometheus: false
   prometheusPort: 20499
+  # The URL where hosts all of pre-build plugins. By default it's at packages.emqx.io
+  # There could be several hosts (host can be separated with comma), if same package could be found in the several hosts,
+  # then the package in the 1st host will have the highest priority.
+  pluginHosts: https://packages.emqx.io
 
 # The default options for all rules. Each rule can override this setting by defining its own option
 rule:

+ 17 - 20
etc/sources/httppull.yaml

@@ -1,25 +1,22 @@
-#Global httppull configurations
+ck1:
+  headers:
+    Accept: application/json
+  interval: 1000
+  method: get
+  url: 127.0.0.1:9527
+ck2:
+  interval: 100
+  method: delete
+  url: http://localhost:9090/pull
 default:
-  # url of the request server address
-  url: http://localhost
-  # post, get, put, delete
-  method: post
-  # The interval between the requests, time unit is ms
-  interval: 10000
-  # The timeout for http request, time unit is ms
-  timeout: 5000
-  # If it's set to true, then will compare with last result; If response of two requests are the same, then will skip sending out the result.
-  # The possible setting could be: true/false
-  incremental: false
-  # The body of request, such as '{"data": "data", "method": 1}'
   body: '{}'
-  # Body type, none|text|json|html|xml|javascript|form
   bodyType: json
-  # HTTP headers required for the request
   headers:
     Accept: application/json
-
-#Override the global configurations
-application_conf: #Conf_key
-  incremental: true
-  url: http://localhost:9090/pull
+  interval: 10000
+  method: post
+  timeout: 5000
+  url: http://localhost
+new:
+  headers: {}
+  url: 127.0.0.1

+ 1 - 0
go.mod

@@ -18,6 +18,7 @@ require (
 	github.com/prometheus/client_golang v1.2.1
 	github.com/sirupsen/logrus v1.4.2
 	github.com/urfave/cli v1.22.0
+	golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553
 )
 
 go 1.13

+ 130 - 0
xstream/server/server/rest.go

@@ -8,10 +8,12 @@ import (
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/gorilla/handlers"
 	"github.com/gorilla/mux"
+	"golang.org/x/net/html"
 	"io"
 	"io/ioutil"
 	"net/http"
 	"runtime"
+	"strings"
 	"time"
 )
 
@@ -82,10 +84,14 @@ func createRestServer(port int) *http.Server {
 	r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
 
 	r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/plugins/sources/prebuild", prebuildSourcePlugins).Methods(http.MethodGet)
 	r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete, http.MethodGet)
+
 	r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/plugins/sinks/prebuild", prebuildSinkPlugins).Methods(http.MethodGet)
 	r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete, http.MethodGet)
 	r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/plugins/functions/prebuild", prebuildFuncsPlugins).Methods(http.MethodGet)
 	r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet)
 
 	r.HandleFunc("/metadata/sinks", sinksMetaHandler).Methods(http.MethodGet)
@@ -411,6 +417,130 @@ func functionHandler(w http.ResponseWriter, r *http.Request) {
 	pluginHandler(w, r, plugins.FUNCTION)
 }
 
+func prebuildSourcePlugins(w http.ResponseWriter, r *http.Request) {
+	prebuildPluginsHandler(w, r, plugins.SOURCE)
+}
+
+func prebuildSinkPlugins(w http.ResponseWriter, r *http.Request) {
+	prebuildPluginsHandler(w, r, plugins.SINK)
+}
+
+func prebuildFuncsPlugins(w http.ResponseWriter, r *http.Request) {
+	prebuildPluginsHandler(w, r, plugins.FUNCTION)
+}
+
+type PrebuildPluginError struct {
+	Error string
+}
+
+func isOffcialDockerImage() bool {
+	return true
+}
+
+func prebuildPluginsHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType) {
+	if runtime.GOOS != "linux" {
+		handleError(w, fmt.Errorf("Plugins can be only installed at Linux."), "", logger)
+		return
+	} else if !isOffcialDockerImage() {
+		handleError(w, fmt.Errorf("Plugins can be only installed at official released Docker images."), "", logger)
+		return
+	} else if runtime.GOOS == "linux" {
+		osrelease, err := common.Read()
+		if err != nil {
+			logger.Infof("")
+			return
+		}
+		prettyName := strings.ToUpper(osrelease["PRETTY_NAME"])
+		if strings.Contains(prettyName, "ALPINE") || strings.Contains(prettyName, "DEBIAN") {
+			hosts := common.Config.Basic.PluginHosts
+			ptype := "sources"
+			if t == plugins.SINK {
+				ptype = "sinks"
+			} else if t == plugins.FUNCTION {
+				ptype = "functions"
+			}
+			if err, plugins := fetchPluginList(hosts, ptype, strings.ToLower(prettyName), runtime.GOARCH); err != nil {
+				handleError(w, err, "", logger)
+			} else {
+				jsonResponse(plugins, w, logger)
+			}
+		} else {
+			handleError(w, fmt.Errorf("Only ALPINE & DEBIAN docker images are supported."), "", logger)
+			return
+		}
+	} else {
+		handleError(w, fmt.Errorf("Please use official Kuiper docker images to install the plugins."), "", logger)
+	}
+}
+
+func fetchPluginList(hosts, ptype, os, arch string) (err error, result map[string]string) {
+	if hosts == "" || ptype == "" || os == "" {
+		return fmt.Errorf("Invalid parameter value: hosts, ptype and os value should not be empty."), nil
+	}
+	result = make(map[string]string)
+	hostsArr := strings.Split(hosts, ",")
+	for _, host := range hostsArr {
+		tmp := []string{host, "kuiper-plugins", version, ptype, os}
+		//The url is similar to http://host:port/kuiper-plugins/0.9.1/sinks/alpine
+		url := strings.Join(tmp, "/")
+		resp, err := http.Get(url)
+
+		if err != nil {
+			return err, nil
+		}
+		defer resp.Body.Close()
+
+		if resp.StatusCode != http.StatusOK {
+			return fmt.Errorf("Status error: %v", resp.StatusCode), nil
+		}
+		data, err := ioutil.ReadAll(resp.Body)
+		if err != nil {
+			return err, nil
+		}
+		plugins := extractFromHtml(string(data), arch)
+		for _, p := range plugins {
+			//If already existed, using the existed.
+			if _, ok := result[p]; !ok {
+				result[p] = url + "/" + p + "_" + arch + ".zip"
+			}
+		}
+	}
+	return
+}
+
+func extractFromHtml(content, arch string) []string {
+	plugins := []string{}
+	htmlTokens := html.NewTokenizer(strings.NewReader(content))
+loop:
+	for {
+		tt := htmlTokens.Next()
+		switch tt {
+		case html.ErrorToken:
+			break loop
+		case html.StartTagToken:
+			t := htmlTokens.Token()
+			isAnchor := t.Data == "a"
+			if isAnchor {
+				found := false
+				for _, prop := range t.Attr {
+					if strings.ToUpper(prop.Key) == "HREF" {
+						if strings.HasSuffix(prop.Val, "_"+arch+".zip") {
+							if index := strings.LastIndex(prop.Val, "_"); index != -1 {
+								plugins = append(plugins, prop.Val[0:index])
+							}
+						}
+						found = true
+					}
+				}
+				if !found {
+					logger.Infof("Invalid plugin download link %s", t)
+				}
+			}
+		}
+	}
+	return plugins
+}
+
 //list sink plugin
 func sinksMetaHandler(w http.ResponseWriter, r *http.Request) {
 	defer r.Body.Close()

+ 244 - 0
xstream/server/server/rest_test.go

@@ -0,0 +1,244 @@
+package server
+
+import (
+	"encoding/json"
+	"fmt"
+	"net/http"
+	"net/http/httptest"
+	"reflect"
+	"strings"
+	"testing"
+)
+
+func TestParseHtml(t1 *testing.T) {
+	var tests = []struct {
+		html    string
+		plugins []string
+		arch    string
+		error   string
+	}{
+		{
+			html: `<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN"><html>
+			<title>Directory listing for enterprise: /4.1.1/</title>
+			<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
+			<meta name="robots" content="noindex,nofollow">
+			<body>
+			<h2>Directory listing for enterprise: /4.1.1/</h2>
+			<hr>
+			<ul>
+				<li><a href="file_386.zip">file_386.zip</a>
+				<li><a href="file_amd64.zip">file_amd64.zip</a>
+				<li><a href="file_arm.zip">file_arm.zip</a>
+				<li><a href="file_arm64.zip">file_arm64.zip</a>
+				<li><a href="file_ppc64le.zip">file_ppc64le.zip</a>
+
+				<li><a href="influx_386.zip">influx_386.zip</a>
+				<li><a href="influx_amd64.zip">influx_amd64.zip</a>
+				<li><a href="influx_arm.zip">influx_arm.zip</a>
+				<li><a href="influx_arm64.zip">influx_arm64.zip</a>
+				<li><a href="influx_ppc64le.zip">influx_ppc64le.zip</a>
+			</ul>
+			<hr>
+			</body>
+			</html>
+			`,
+			arch:    "arm64",
+			plugins: []string{"file", "influx"},
+			error:   "",
+		},
+
+		{
+			html: `<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN"><html>
+			<title>Directory listing for enterprise: /4.1.1/</title>
+			<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
+			<meta name="robots" content="noindex,nofollow">
+			<body>
+			<h2>Directory listing for enterprise: /4.1.1/</h2>
+			<hr>
+			<ul>
+				<li><a href="file_386.zip">file_386.zip</a>
+				<li><a href="file_amd64.zip">file_amd64.zip</a>
+				<li><a href="file_arm.zip">file_arm.zip</a>
+				<li><a href="file_arm64.zip">file_arm64.zip</a>
+				<li><a href="file_ppc64le.zip">file_ppc64le.zip</a>
+
+				<li><a href="influx_386.zip">influx_386.zip</a>
+				<li><a href="influx_amd64.zip">influx_amd64.zip</a>
+				<li><a href="influx_arm.zip">influx_arm.zip</a>
+				<li><a href="influx_arm64.zip">influx_arm64.zip</a>
+				<li><a href="influx_ppc64le.zip">influx_ppc64le.zip</a>
+			</ul>
+			<hr>
+			</body>
+			</html>
+			`,
+			arch:    "arm7",
+			plugins: []string{},
+			error:   "",
+		},
+
+		{
+			html: `<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN"><html>
+			<title>Directory listing for enterprise: /4.1.1/</title>
+			<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
+			<meta name="robots" content="noindex,nofollow">
+			<body>
+			<h2>Directory listing for enterprise: /4.1.1/</h2>
+			<hr>
+			<ul>
+				<li><a href="file_386.zip">file_386.zip</a>
+				<li><a href="file_amd64.zip">file_amd64.zip</a>
+				<li><a href="file_arm.zip">file_arm.zip</a>
+				<li><a href="file_arm64.zip">file_arm64.zip</a>
+				<li><a href="file_ppc64le.zip">file_ppc64le.zip</a>
+
+				<li><a href="influx_arm.zip">influx_arm.zip</a>
+				<li><a href="influx_arm64.zip">influx_arm64.zip</a>
+				<li><a href="influx_ppc64le.zip">influx_ppc64le.zip</a>
+			</ul>
+			<hr>
+			</body>
+			</html>
+			`,
+			arch:    "amd64",
+			plugins: []string{"file"},
+			error:   "",
+		},
+
+		{
+			html: `<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN"><html>
+			<title>Directory listing for enterprise: /4.1.1/</title>
+			<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
+			<meta name="robots" content="noindex,nofollow">
+			<body>
+			<h2>Directory listing for enterprise: /4.1.1/</h2>
+			<hr>
+			<ul>
+				
+			</ul>
+			<hr>
+			</body>
+			</html>
+			`,
+			arch:    "amd64",
+			plugins: []string{},
+			error:   "",
+		},
+
+		{
+			html:    ``,
+			arch:    "amd64",
+			plugins: []string{},
+			error:   "",
+		},
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	for i, t := range tests {
+		result := extractFromHtml(t.html, t.arch)
+		if t.error == "" && !reflect.DeepEqual(t.plugins, result) {
+			t1.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, t.html, t.plugins, result)
+		}
+	}
+}
+
+func TestFetchPluginList(t1 *testing.T) {
+	version = "0.9.1"
+	// Start a local HTTP server
+	server1 := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
+		// Send response to be tested
+		if _, err := rw.Write([]byte(`<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN"><html>
+			<title>Directory listing for enterprise: /4.1.1/</title>
+			<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
+			<meta name="robots" content="noindex,nofollow">
+			<body>
+			<h2>Directory listing for enterprise: /4.1.1/</h2>
+			<hr>
+			<ul>
+				<li><a href="file_386.zip">file_386.zip</a>
+				<li><a href="file_amd64.zip">file_amd64.zip</a>
+				<li><a href="file_arm.zip">file_arm.zip</a>
+				<li><a href="file_arm64.zip">file_arm64.zip</a>
+				<li><a href="file_ppc64le.zip">file_ppc64le.zip</a>
+
+				<li><a href="influx_386.zip">influx_386.zip</a>
+				<li><a href="influx_amd64.zip">influx_amd64.zip</a>
+				<li><a href="influx_arm.zip">influx_arm.zip</a>
+				<li><a href="influx_arm64.zip">influx_arm64.zip</a>
+				<li><a href="influx_ppc64le.zip">influx_ppc64le.zip</a>
+			</ul>
+			<hr>
+			</body>
+			</html>
+			`)); err != nil {
+			fmt.Printf("%s", err)
+		}
+
+	}))
+
+	server2 := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
+		// Send response to be tested
+		if _, err := rw.Write([]byte(`<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN"><html>
+			<title>Directory listing for enterprise: /4.1.1/</title>
+			<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
+			<meta name="robots" content="noindex,nofollow">
+			<body>
+			<h2>Directory listing for enterprise: /4.1.1/</h2>
+			<hr>
+			<ul>
+				<li><a href="file_arm64.zip">file_arm64.zip</a>
+
+				<li><a href="zmq_386.zip">influx_386.zip</a>
+				<li><a href="zmq_amd64.zip">influx_amd64.zip</a>
+				<li><a href="zmq_arm.zip">influx_arm.zip</a>
+				<li><a href="zmq_arm64.zip">influx_arm64.zip</a>
+				<li><a href="zmq_ppc64le.zip">influx_ppc64le.zip</a>
+			</ul>
+			<hr>
+			</body>
+			</html>
+			`)); err != nil {
+			fmt.Printf("%s", err)
+		}
+
+	}))
+
+	// Close the server when test finishes
+	defer server2.Close()
+
+	if e, r := fetchPluginList(strings.Join([]string{server1.URL, server2.URL}, ","), "sinks", "alpine", "arm64"); e != nil {
+		t1.Errorf("Error: %v", e)
+	} else {
+		exp := map[string]string{
+			"file":   server1.URL + "/kuiper-plugins/" + version + "/sinks/alpine/file_arm64.zip",
+			"influx": server1.URL + "/kuiper-plugins/" + version + "/sinks/alpine/influx_arm64.zip",
+			"zmq":    server2.URL + "/kuiper-plugins/" + version + "/sinks/alpine/zmq_arm64.zip",
+		}
+		d, _ := json.Marshal(exp)
+		fmt.Println(string(d))
+		if !reflect.DeepEqual(exp, r) {
+			t1.Errorf("result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", exp, r)
+		}
+	}
+
+	if e, r := fetchPluginList(strings.Join([]string{server2.URL}, ","), "sinks", "alpine", "arm64"); e != nil {
+		t1.Errorf("Error: %v", e)
+	} else {
+		exp := map[string]string{
+			"zmq":  server2.URL + "/kuiper-plugins/" + version + "/sinks/alpine/zmq_arm64.zip",
+			"file": server2.URL + "/kuiper-plugins/" + version + "/sinks/alpine/file_arm64.zip",
+		}
+		if !reflect.DeepEqual(exp, r) {
+			t1.Errorf("result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", exp, r)
+		}
+	}
+
+	if e, r := fetchPluginList(strings.Join([]string{server1.URL, server2.URL}, ","), "sinks", "alpine", "armv7"); e != nil {
+		t1.Errorf("Error: %v", e)
+	} else {
+		exp := map[string]string{}
+		if !reflect.DeepEqual(exp, r) {
+			t1.Errorf("result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", exp, r)
+		}
+	}
+}