浏览代码

feat(tool Automatically streams & rules management by monitoring spec… (#352)

* feat(tool):tomatically streams & rules

* feat(tool):tomatically streams & rules del file of history

* feat(tool):tomatically streams & rules

* feat(tool):tomatically streams & rules

* feat(tool):tomatically streams & rules

* feat(tool):tomatically streams & rules

* feat(tool):tomatically streams & rules

* feat(tool):tomatically streams & rules
EMQmyd 4 年之前
父节点
当前提交
21783fcb18

+ 301 - 0
tools/kubeedge/README-CN.md

@@ -0,0 +1,301 @@
+## 1 程序说明及其配置:
+
+### 1.1 程序说明:
+
+​    本程序用于处理命令文件夹中的文件,处理完毕后将文件移除。之后程序监控命令文件夹,当命令文件夹下出现文件时重复上述步骤,直到命令文件夹下没有文件为止。程序启动时,用户需要在配置文件中指定命令文件夹的路径,之后用户只需要将编辑好的命令文件放在命令文件夹中即可。
+
+### 1.2 命令文件格式及含义:
+
+| 字段        | 是否必填   | 类型     | 释义         |
+| ----------- | ---------- | -------- | ------------ |
+| commands    | 必填       | array    | 命令集合     |
+| url         | 必填       | string   | http请求路径 |
+| method      | 必填       | string   | http请求方法 |
+| description | 选填       | string   | 操作描述     |
+| data        | 创建时必填 | json obj | 创建内容     |
+|             |            |          |              |
+
+### 1.3 配置文件格式及含义:
+```yaml
+port: 9081  //kuiper 端口
+timeout: 500  //执行一条命令超时时间(单位:毫秒)
+intervalTime: 60  //隔多久检查一次命令文件夹(单位:秒)
+ip: "127.0.0.1" //kuiper ip地址
+logPath: "./log/kubeedge.log" //日志保存路径
+commandDir: "./sample/" //命令文件夹路径
+```
+### 1.4 编译程序:
+
+执行 `go build -o tools/kubeedge/kubeedge tools/kubeedge/main.go` 命令即可生成 kubeedge 程序。
+
+## 2 流的操作示例
+
+### 2.1 创建流 stream1
+
+```json
+{
+    "commands":[
+        {
+            "url":"/streams",
+            "description":"create stream1",
+            "method":"post",
+            "data":{
+                "sql":"create stream stream1 (id bigint, name string, score float) WITH ( datasource = \"topic/temperature\", FORMAT = \"json\", KEY = \"id\");"
+            }
+        }]
+}
+```
+
+### 2.2 显示流列表
+```json
+{
+    "commands":[
+        {
+            "url":"/streams",
+            "description":"list stream",
+            "method":"get"
+        }]
+}
+```
+
+### 2.3 获取流 stream1
+```json
+{
+    "commands":[
+        {
+            "url":"/streams/stream1",
+            "description":"get stream1",
+            "method":"get"
+        }]
+}
+```
+
+### 2.4 删除流 stream1
+
+```json
+{
+    "commands":[
+        {
+            "url":"/streams/stream1",
+            "description":"del stream1",
+            "method":"delete"
+        }]
+}
+```
+
+## 3 规则的操作示例
+
+### 3.1 创建规则 rule1
+
+```json
+{
+    "commands":[
+        {
+            "url":"/rules",
+            "description":"create rule1",
+            "method":"post",
+            "data":{
+                "id":"rule1",
+                "sql":"SELECT * FROM stream1",
+                "actions":[
+                    {
+                        "log":{
+                        }
+                    }]
+            }
+        }]
+}
+```
+
+### 3.2 显示规则列表
+
+```json
+{
+    "commands":[
+        {
+            "url":"/rules",
+            "description":"list rule",
+            "method":"get"
+        }]
+}
+```
+
+### 3.3 获取规则 rule1
+
+```json
+{
+    "commands":[
+        {
+            "url":"/rules/rule1",
+            "description":"get rule1",
+            "method":"get"
+        }]
+}
+```
+
+### 3.4 删除规则 rule1
+
+```json
+{
+    "commands":[
+        {
+            "url":"/rules/rule1",
+            "description":"del rule1",
+            "method":"delete"
+        }]
+}
+
+```
+### 3.5 停止规则 rule1
+
+```json
+{
+    "commands":[
+        {
+            "url":"/rules/rule1/stop",
+            "description":"stop rule1",
+            "method":"post"
+        }]
+}
+```
+
+### 3.6 启动规则 rule1
+
+```json
+{
+    "commands":[
+        {
+            "url":"/rules/rule1/start",
+            "description":"start rule1",
+            "method":"post"
+        }]
+}
+```
+
+### 3.7 重启规则 rule1
+
+```json
+{
+    "commands":[
+        {
+            "url":"/rules/rule1/restart",
+            "description":"restart rule1",
+            "method":"post"
+        }]
+}
+```
+
+### 3.8 显示规则 rule1 的状态
+
+```json
+{
+    "commands":[
+        {
+            "url":"/rules/rule1/status",
+            "description":"get rule1 status",
+            "method":"get"
+        }]
+}
+```
+
+## 4 多命令集合示例:
+
+```json
+{
+    "commands":[
+        {
+            "url":"/streams",
+            "description":"create stream1",
+            "method":"post",
+            "data":{
+                "sql":"create stream stream1 (id bigint, name string, score float) WITH ( datasource = \"topic/temperature\", FORMAT = \"json\", KEY = \"id\");"
+            }
+        },
+        {
+            "url":"/streams",
+            "description":"create stream2",
+            "method":"post",
+            "data":{
+                "sql":"create stream stream2 (id bigint, name string, score float) WITH ( datasource = \"topic/temperature\", FORMAT = \"json\", KEY = \"id\");"
+            }
+        },
+        {
+            "url":"/streams",
+            "description":"list stream",
+            "method":"get"
+        },
+        {
+            "url":"/streams/stream1",
+            "description":"get stream1",
+            "method":"get"
+        },
+        {
+            "url":"/streams/stream2",
+            "description":"del stream2",
+            "method":"delete"
+        },
+        {
+            "url":"/rules",
+            "description":"create rule1",
+            "method":"post",
+            "data":{
+                "id":"rule1",
+                "sql":"SELECT * FROM stream1",
+                "actions":[
+                    {
+                        "log":{
+                        }
+                    }]
+            }
+        },
+        {
+            "url":"/rules",
+            "description":"create rule2",
+            "method":"post",
+            "data":{
+                "id":"rule2",
+                "sql":"SELECT * FROM stream1",
+                "actions":[
+                    {
+                        "log":{
+                        }
+                    }]
+            }
+        },
+        {
+            "url":"/rules",
+            "description":"list rule",
+            "method":"get"
+        },
+        {
+            "url":"/rules/rule1",
+            "description":"get rule1",
+            "method":"get"
+        },
+        {
+            "url":"/rules/rule2",
+            "description":"del rule2",
+            "method":"delete"
+        },
+        {
+            "url":"/rules/rule1/stop",
+            "description":"stop rule1",
+            "method":"post"
+        },
+        {
+            "url":"/rules/rule1/start",
+            "description":"start rule1",
+            "method":"post"
+        },
+        {
+            "url":"/rules/rule1/restart",
+            "description":"restart rule1",
+            "method":"post"
+        },
+        {
+            "url":"/rules/rule1/status",
+            "description":"get rule1 status",
+            "method":"get"
+        }]
+}
+```

+ 301 - 0
tools/kubeedge/README.md

@@ -0,0 +1,301 @@
+## 1 程序说明及其配置:
+
+### 1.1 程序说明:
+
+​    本程序用于处理命令文件夹中的文件,处理完毕后将文件移除。之后程序监控命令文件夹,当命令文件夹下出现文件时重复上述步骤,直到命令文件夹下没有文件为止。程序启动时,用户需要在配置文件中指定命令文件夹的路径,之后用户只需要将编辑好的命令文件放在命令文件夹中即可。
+
+### 1.2 命令文件格式及含义:
+
+| 字段        | 是否必填   | 类型     | 释义         |
+| ----------- | ---------- | -------- | ------------ |
+| commands    | 必填       | array    | 命令集合     |
+| url         | 必填       | string   | http请求路径 |
+| method      | 必填       | string   | http请求方法 |
+| description | 选填       | string   | 操作描述     |
+| data        | 创建时必填 | json obj | 创建内容     |
+|             |            |          |              |
+
+### 1.3 配置文件格式及含义:
+```yaml
+port: 9081  //kuiper 端口
+timeout: 500  //执行一条命令超时时间(单位:毫秒)
+intervalTime: 60  //隔多久检查一次命令文件夹(单位:秒)
+ip: "127.0.0.1" //kuiper ip地址
+logPath: "./log/kubeedge.log" //日志保存路径
+commandDir: "./sample/" //命令文件夹路径
+```
+### 1.4 编译程序:
+
+执行 `go build -o tools/kubeedge/kubeedge tools/kubeedge/main.go` 命令即可生成 kubeedge 程序。
+
+## 2 流的操作示例
+
+### 2.1 创建流 stream1
+
+```json
+{
+    "commands":[
+        {
+            "url":"/streams",
+            "description":"create stream1",
+            "method":"post",
+            "data":{
+                "sql":"create stream stream1 (id bigint, name string, score float) WITH ( datasource = \"topic/temperature\", FORMAT = \"json\", KEY = \"id\");"
+            }
+        }]
+}
+```
+
+### 2.2 显示流列表
+```json
+{
+    "commands":[
+        {
+            "url":"/streams",
+            "description":"list stream",
+            "method":"get"
+        }]
+}
+```
+
+### 2.3 获取流 stream1
+```json
+{
+    "commands":[
+        {
+            "url":"/streams/stream1",
+            "description":"get stream1",
+            "method":"get"
+        }]
+}
+```
+
+### 2.4 删除流 stream1
+
+```json
+{
+    "commands":[
+        {
+            "url":"/streams/stream1",
+            "description":"del stream1",
+            "method":"delete"
+        }]
+}
+```
+
+## 3 规则的操作示例
+
+### 3.1 创建规则 rule1
+
+```json
+{
+    "commands":[
+        {
+            "url":"/rules",
+            "description":"create rule1",
+            "method":"post",
+            "data":{
+                "id":"rule1",
+                "sql":"SELECT * FROM stream1",
+                "actions":[
+                    {
+                        "log":{
+                        }
+                    }]
+            }
+        }]
+}
+```
+
+### 3.2 显示规则列表
+
+```json
+{
+    "commands":[
+        {
+            "url":"/rules",
+            "description":"list rule",
+            "method":"get"
+        }]
+}
+```
+
+### 3.3 获取规则 rule1
+
+```json
+{
+    "commands":[
+        {
+            "url":"/rules/rule1",
+            "description":"get rule1",
+            "method":"get"
+        }]
+}
+```
+
+### 3.4 删除规则 rule1
+
+```json
+{
+    "commands":[
+        {
+            "url":"/rules/rule1",
+            "description":"del rule1",
+            "method":"delete"
+        }]
+}
+
+```
+### 3.5 停止规则 rule1
+
+```json
+{
+    "commands":[
+        {
+            "url":"/rules/rule1/stop",
+            "description":"stop rule1",
+            "method":"post"
+        }]
+}
+```
+
+### 3.6 启动规则 rule1
+
+```json
+{
+    "commands":[
+        {
+            "url":"/rules/rule1/start",
+            "description":"start rule1",
+            "method":"post"
+        }]
+}
+```
+
+### 3.7 重启规则 rule1
+
+```json
+{
+    "commands":[
+        {
+            "url":"/rules/rule1/restart",
+            "description":"restart rule1",
+            "method":"post"
+        }]
+}
+```
+
+### 3.8 显示规则 rule1 的状态
+
+```json
+{
+    "commands":[
+        {
+            "url":"/rules/rule1/status",
+            "description":"get rule1 status",
+            "method":"get"
+        }]
+}
+```
+
+## 4 多命令集合示例:
+
+```json
+{
+    "commands":[
+        {
+            "url":"/streams",
+            "description":"create stream1",
+            "method":"post",
+            "data":{
+                "sql":"create stream stream1 (id bigint, name string, score float) WITH ( datasource = \"topic/temperature\", FORMAT = \"json\", KEY = \"id\");"
+            }
+        },
+        {
+            "url":"/streams",
+            "description":"create stream2",
+            "method":"post",
+            "data":{
+                "sql":"create stream stream2 (id bigint, name string, score float) WITH ( datasource = \"topic/temperature\", FORMAT = \"json\", KEY = \"id\");"
+            }
+        },
+        {
+            "url":"/streams",
+            "description":"list stream",
+            "method":"get"
+        },
+        {
+            "url":"/streams/stream1",
+            "description":"get stream1",
+            "method":"get"
+        },
+        {
+            "url":"/streams/stream2",
+            "description":"del stream2",
+            "method":"delete"
+        },
+        {
+            "url":"/rules",
+            "description":"create rule1",
+            "method":"post",
+            "data":{
+                "id":"rule1",
+                "sql":"SELECT * FROM stream1",
+                "actions":[
+                    {
+                        "log":{
+                        }
+                    }]
+            }
+        },
+        {
+            "url":"/rules",
+            "description":"create rule2",
+            "method":"post",
+            "data":{
+                "id":"rule2",
+                "sql":"SELECT * FROM stream1",
+                "actions":[
+                    {
+                        "log":{
+                        }
+                    }]
+            }
+        },
+        {
+            "url":"/rules",
+            "description":"list rule",
+            "method":"get"
+        },
+        {
+            "url":"/rules/rule1",
+            "description":"get rule1",
+            "method":"get"
+        },
+        {
+            "url":"/rules/rule2",
+            "description":"del rule2",
+            "method":"delete"
+        },
+        {
+            "url":"/rules/rule1/stop",
+            "description":"stop rule1",
+            "method":"post"
+        },
+        {
+            "url":"/rules/rule1/start",
+            "description":"start rule1",
+            "method":"post"
+        },
+        {
+            "url":"/rules/rule1/restart",
+            "description":"restart rule1",
+            "method":"post"
+        },
+        {
+            "url":"/rules/rule1/status",
+            "description":"get rule1 status",
+            "method":"get"
+        }]
+}
+```

+ 180 - 0
tools/kubeedge/common/conf.go

@@ -0,0 +1,180 @@
+package common
+
+import (
+	"bytes"
+	"fmt"
+	"github.com/go-yaml/yaml"
+	"github.com/sirupsen/logrus"
+	"io/ioutil"
+	"net/http"
+	"os"
+	"path"
+	"path/filepath"
+	"runtime"
+	"time"
+)
+
+type (
+	config struct {
+		Port         int    `yaml:"port"`
+		Timeout      int    `yaml:"timeout"`
+		IntervalTime int    `yaml:"intervalTime"`
+		Ip           string `yaml:"ip"`
+		LogPath      string `yaml:"logPath"`
+		CommandDir   string `yaml:"commandDir"`
+	}
+)
+
+var g_conf config
+
+func GetConf() *config {
+	return &g_conf
+}
+func (this *config) GetIntervalTime() int {
+	return this.IntervalTime
+}
+func (this *config) GetIp() string {
+	return this.Ip
+}
+func (this *config) GetPort() int {
+	return this.Port
+}
+func (this *config) GetLogPath() string {
+	return this.LogPath
+}
+func (this *config) GetCommandDir() string {
+	return this.CommandDir
+}
+
+func processPath(path string) (string, error) {
+	if abs, err := filepath.Abs(path); err != nil {
+		return "", nil
+	} else {
+		if _, err := os.Stat(abs); os.IsNotExist(err) {
+			return "", err
+		}
+		return abs, nil
+	}
+}
+
+func (this *config) initConfig() bool {
+	confPath, err := processPath(os.Args[1])
+	if nil != err {
+		fmt.Println("conf path err : ", err)
+		return false
+	}
+	sliByte, err := ioutil.ReadFile(confPath)
+	if nil != err {
+		fmt.Println("load conf err : ", err)
+		return false
+	}
+	err = yaml.Unmarshal(sliByte, this)
+	if nil != err {
+		fmt.Println("unmashal conf err : ", err)
+		return false
+	}
+
+	if this.CommandDir, err = filepath.Abs(this.CommandDir); err != nil {
+		fmt.Println("command dir err : ", err)
+		return false
+	}
+	if _, err = os.Stat(this.CommandDir); os.IsNotExist(err) {
+		return false
+	}
+
+	if this.LogPath, err = filepath.Abs(this.LogPath); nil != err {
+		fmt.Println("log dir err : ", err)
+		return false
+	}
+	if _, err = os.Stat(this.LogPath); os.IsNotExist(err) {
+		if err = os.MkdirAll(path.Dir(this.LogPath), 0755); nil != err {
+			fmt.Println("mak logdir err : ", err)
+			return false
+		}
+	}
+	return true
+}
+
+var (
+	Log      *logrus.Logger
+	g_client http.Client
+)
+
+func (this *config) initTimeout() {
+	g_client.Timeout = time.Duration(this.Timeout) * time.Millisecond
+}
+
+func (this *config) initLog() bool {
+	Log = logrus.New()
+	Log.SetReportCaller(true)
+	Log.SetFormatter(&logrus.TextFormatter{
+		CallerPrettyfier: func(f *runtime.Frame) (string, string) {
+			filename := path.Base(f.File)
+			return "", fmt.Sprintf("%s:%d", filename, f.Line)
+		},
+		DisableColors: true,
+		FullTimestamp: true,
+	})
+
+	logFile, err := os.OpenFile(this.LogPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
+	if err == nil {
+		Log.SetOutput(logFile)
+		return true
+	} else {
+		Log.Infof("Failed to log to file, using default stderr.")
+		return false
+	}
+	return false
+}
+func (this *config) Init() bool {
+	if !this.initConfig() {
+		return false
+	}
+
+	if !this.initLog() {
+		return false
+	}
+	this.initTimeout()
+	return true
+}
+
+func fetchContents(request *http.Request) (data []byte, err error) {
+	respon, err := g_client.Do(request)
+	if nil != err {
+		return nil, err
+	}
+	defer respon.Body.Close()
+	data, err = ioutil.ReadAll(respon.Body)
+	if nil != err {
+		return nil, err
+	}
+	if respon.StatusCode < 200 || respon.StatusCode > 299 {
+		return data, fmt.Errorf("http return code: %d and error message %s.", respon.StatusCode, string(data))
+	}
+	return data, err
+}
+
+func Get(inUrl string) (data []byte, err error) {
+	request, err := http.NewRequest(http.MethodGet, inUrl, nil)
+	if nil != err {
+		return nil, err
+	}
+	return fetchContents(request)
+}
+
+func Post(inHead, inBody string) (data []byte, err error) {
+	request, err := http.NewRequest(http.MethodPost, inHead, bytes.NewBuffer([]byte(inBody)))
+	if nil != err {
+		return nil, err
+	}
+	request.Header.Set("Content-Type", "application/json")
+	return fetchContents(request)
+}
+
+func Delete(inUrl string) (data []byte, err error) {
+	request, err := http.NewRequest(http.MethodDelete, inUrl, nil)
+	if nil != err {
+		return nil, err
+	}
+	return fetchContents(request)
+}

+ 6 - 0
tools/kubeedge/conf/cf.yaml

@@ -0,0 +1,6 @@
+port: 9081
+timeout: 500
+intervalTime: 60
+ip: "127.0.0.1"
+logPath: "./log/kubeedge.log"
+commandDir: "./sample/"

+ 9 - 0
tools/kubeedge/main.go

@@ -0,0 +1,9 @@
+package main
+
+import (
+	"github.com/emqx/kuiper/tools/kubeedge/util"
+)
+
+func main() {
+	util.Process()
+}

+ 99 - 0
tools/kubeedge/sample/sample.json

@@ -0,0 +1,99 @@
+{
+    "commands":[
+        {
+            "url":"/streams",
+            "description":"create stream1",
+            "method":"post",
+            "data":{
+                "sql":"create stream stream1 (id bigint, name string, score float) WITH ( datasource = \"topic/temperature\", FORMAT = \"json\", KEY = \"id\");"
+            }
+        },
+        {
+            "url":"/streams",
+            "description":"create stream2",
+            "method":"post",
+            "data":{
+                "sql":"create stream stream2 (id bigint, name string, score float) WITH ( datasource = \"topic/temperature\", FORMAT = \"json\", KEY = \"id\");"
+            }
+        },
+        {
+            "url":"/streams",
+            "description":"list stream",
+            "method":"get"
+        },
+        {
+            "url":"/streams/stream1",
+            "description":"get stream1",
+            "method":"get"
+        },
+        {
+            "url":"/streams/stream2",
+            "description":"del stream2",
+            "method":"delete"
+        },
+        {
+            "url":"/rules",
+            "description":"create rule1",
+            "method":"post",
+            "data":{
+                "id":"rule1",
+                "sql":"SELECT * FROM stream1",
+                "actions":[
+                    {
+                        "log":{
+                        }
+                    }]
+            }
+        },
+        {
+            "url":"/rules",
+            "description":"create rule2",
+            "method":"post",
+            "data":{
+                "id":"rule2",
+                "sql":"SELECT * FROM stream1",
+                "actions":[
+                    {
+                        "log":{
+                        }
+                    }]
+            }
+        },
+        {
+            "url":"/rules",
+            "description":"list rule",
+            "method":"get"
+        },
+        {
+            "url":"/rules/rule1",
+            "description":"get rule1",
+            "method":"get"
+        },
+        {
+            "url":"/rules/rule2",
+            "description":"del rule2",
+            "method":"delete"
+        },
+        {
+            "url":"/rules/rule1/stop",
+            "description":"stop rule1",
+            "method":"post"
+        },
+        {
+            "url":"/rules/rule1/start",
+            "description":"start rule1",
+            "method":"post"
+        },
+        {
+            "url":"/rules/rule1/restart",
+            "description":"restart rule1",
+            "method":"post"
+        },
+        {
+            "url":"/rules/rule1/status",
+            "description":"get rule1 status",
+            "method":"get"
+        }]
+}
+
+

+ 163 - 0
tools/kubeedge/util/util.go

@@ -0,0 +1,163 @@
+package util
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/emqx/kuiper/tools/kubeedge/common"
+	"io/ioutil"
+	"os"
+	"path"
+	"strconv"
+	"time"
+)
+
+type (
+	command struct {
+		Url         string      `json:"url"`
+		Description string      `json:"description"`
+		Method      string      `json:"method"`
+		Data        interface{} `json:"data"`
+		strLog      string
+	}
+	fileData struct {
+		Commands []*command `json:"commands"`
+	}
+)
+
+func (this *command) getLog() string {
+	return this.strLog
+}
+
+func (this *command) call(host string) bool {
+	var resp []byte
+	var err error
+	head := host + this.Url
+	body, _ := json.Marshal(this.Data)
+	switch this.Method {
+	case "post", "POST":
+		resp, err = common.Post(head, string(body))
+		break
+	case "get", "GET":
+		resp, err = common.Get(head)
+		break
+	case "delete", "DELETE":
+		resp, err = common.Delete(head)
+		break
+	default:
+		this.strLog = fmt.Sprintf("no such method : %s", this.Method)
+		return false
+	}
+	if nil == err {
+		this.strLog = fmt.Sprintf("%s:%s resp:%s", head, this.Method, string(resp))
+		return true
+	}
+	this.strLog = fmt.Sprintf("%s:%s resp:%s err:%v", head, this.Method, string(resp), err)
+	return false
+}
+
+type (
+	server struct {
+		dirCommand string
+		dirHistory string
+		logs       []string
+	}
+)
+
+func (this *server) setDirCommand(dir string) {
+	this.dirCommand = dir
+}
+func (this *server) setDirHistory(dir string) {
+	this.dirHistory = dir
+}
+
+func (this *server) getLogs() []string {
+	return this.logs
+}
+func (this *server) printLogs() {
+	for _, v := range this.logs {
+		common.Log.Info(v)
+	}
+	this.logs = this.logs[:0]
+}
+
+func (this *server) init() bool {
+	conf := common.GetConf()
+	dirCommand := conf.GetCommandDir()
+	dirHistory := path.Join(path.Dir(dirCommand), ".history")
+	if err := os.MkdirAll(dirHistory, 0755); nil != err {
+		this.logs = append(this.logs, fmt.Sprintf("mkdir history dir:%v", err))
+		return false
+	}
+	this.dirCommand = dirCommand
+	this.dirHistory = dirHistory
+	return true
+}
+
+func (this *server) processDir() bool {
+	infos, err := ioutil.ReadDir(this.dirCommand)
+	if nil != err {
+		this.logs = append(this.logs, fmt.Sprintf("read command dir:%v", err))
+		return false
+	}
+	conf := common.GetConf()
+	host := fmt.Sprintf(`http://%s:%d`, conf.GetIp(), conf.GetPort())
+	for _, info := range infos {
+		filePath := path.Join(this.dirCommand, info.Name())
+		file := new(fileData)
+		sliByte, err := ioutil.ReadFile(filePath)
+		if nil != err {
+			this.logs = append(this.logs, fmt.Sprintf("load command file:%v", err))
+			return false
+		}
+		err = json.Unmarshal(sliByte, file)
+		if nil != err {
+			this.logs = append(this.logs, fmt.Sprintf("unmarshal command file:%v", err))
+			return false
+		}
+
+		for _, command := range file.Commands {
+			flag := command.call(host)
+			this.logs = append(this.logs, command.getLog())
+			if !flag {
+				return false
+			}
+		}
+		newFileName := info.Name() + "_" + strconv.FormatInt(time.Now().Unix(), 10)
+		newFilePath := path.Join(this.dirHistory, newFileName)
+		os.Rename(filePath, newFilePath)
+	}
+	return true
+}
+
+func (this *server) watchFolders() {
+	conf := common.GetConf()
+	this.processDir()
+	this.printLogs()
+	chTime := time.Tick(time.Second * time.Duration(conf.GetIntervalTime()))
+	for {
+		select {
+		case <-chTime:
+			this.processDir()
+			this.printLogs()
+		}
+	}
+}
+
+func Process() {
+	if len(os.Args) != 2 {
+		common.Log.Fatal("Missing configuration file")
+		return
+	}
+
+	conf := common.GetConf()
+	if !conf.Init() {
+		return
+	}
+
+	se := new(server)
+	if !se.init() {
+		se.printLogs()
+		return
+	}
+	se.watchFolders()
+}

+ 98 - 0
tools/kubeedge/util/util_test.go

@@ -0,0 +1,98 @@
+package util
+
+import (
+	"net/http"
+	"net/http/httptest"
+	"testing"
+)
+
+func TestCall(t *testing.T) {
+	var tests = []struct {
+		cmd command
+		exp bool
+	}{
+		{
+			cmd: command{
+				Url:    `/streams`,
+				Method: `post`,
+				Data:   struct{ sql string }{sql: `create stream stream1 (id bigint, name string, score float) WITH ( datasource = \"topic/temperature\", FORMAT = \"json\", KEY = \"id\");`},
+			},
+			exp: true,
+		},
+		{
+			cmd: command{
+				Url:    `/streams`,
+				Method: `get`,
+			},
+			exp: true,
+		},
+		{
+			cmd: command{
+				Url:    `/streams`,
+				Method: `put`,
+			},
+			exp: false,
+		},
+		{
+			cmd: command{
+				Url:    `/rules`,
+				Method: `post`,
+				Data: struct {
+					id      string
+					sql     string
+					actions []struct{ log struct{} }
+				}{
+					id:  `ruler1`,
+					sql: `SELECT * FROM stream1`,
+				},
+			},
+			exp: true,
+		},
+		{
+			cmd: command{
+				Url:    `/rules`,
+				Method: `get`,
+			},
+			exp: true,
+		},
+		{
+			cmd: command{
+				Url:    `/rules/rule1`,
+				Method: `get`,
+			},
+			exp: true,
+		},
+		{
+			cmd: command{
+				Url:    `/rules/rule2`,
+				Method: `delete`,
+			},
+			exp: true,
+		},
+		{
+			cmd: command{
+				Url:    `/rules/rule1/stop`,
+				Method: `post`,
+			},
+			exp: true,
+		},
+		{
+			cmd: command{
+				Url:    `/rules/rule1/start`,
+				Method: `post`,
+			},
+			exp: true,
+		},
+	}
+
+	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+	}))
+	defer ts.Close()
+
+	for _, v := range tests {
+		ret := v.cmd.call(ts.URL)
+		if v.exp != ret {
+			t.Errorf("url:%s method:%s log:%s\n", v.cmd.Url, v.cmd.Method, v.cmd.getLog())
+		}
+	}
+}