Kaynağa Gözat

333 (#361)

* feat(tool):tomatically streams & rules

* feat(tool):tomatically streams & rules del file of history

* feat(tool):tomatically streams & rules

* feat(tool):tomatically streams & rules

* feat(tool):tomatically streams & rules

* feat(tool):tomatically streams & rules

* feat(tool):tomatically streams & rules

* feat(tool):tomatically streams & rules

* feat(tool):tomatically streams & rules

* feat(tool):tomatically streams & rules

* feat(tool):tomatically streams & rules
EMQmyd 4 yıl önce
ebeveyn
işleme
353613abe0

+ 9 - 1
tools/kubeedge/README-CN.md

@@ -2,7 +2,15 @@
 
 ### 1.1 程序说明:
 
-​    本程序用于处理命令文件夹中的文件,处理完毕后将文件移除。之后程序监控命令文件夹,当命令文件夹下出现文件时重复上述步骤,直到命令文件夹下没有文件为止。程序启动时,用户需要在配置文件中指定命令文件夹的路径,之后用户只需要将编辑好的命令文件放在命令文件夹中即可。
+​    本程序用于监控并处理命令文件夹中的文件。当程序发现命令文件夹下存在新创建的文件或已更新的文件时,程序将加载这些文件并执行文件中的命令,之后程序将处理过的文件名记录在命令文件夹同级目录的 .history 文件中。 .history 的数据格式如下:
+
+```json
+[
+  {
+        "name":"sample.json",	//已处理过文件的文件名
+        "loadTime":1594362344 //处理文件时的时间戳
+    }]
+```
 
 ### 1.2 命令文件格式及含义:
 

+ 10 - 1
tools/kubeedge/README.md

@@ -2,7 +2,15 @@
 
 ### 1.1 Program description:
 
-This program is used to process the files in the command folder and remove the files after processing. After that, the program monitors the command folder and repeats the above steps when there are files in the command folder until there are no files in the command folder. When the program starts, the user needs to specify the path of the command folder in the configuration file, and then the user only needs to place the edited command file in the command folder.
+​    本程序用于监控并处理命令文件夹中的文件。当程序发现命令文件夹下存在新创建的文件或已更新的文件时,程序将加载这些文件并执行文件中的命令,之后程序将处理过的文件名记录在命令文件夹同级目录的 .history 文件中。 .history 的数据格式如下:
+
+```json
+[
+  {
+        "name":"sample.json",	//已处理过文件的文件名
+        "loadTime":1594362344 //处理文件时的时间戳
+    }]
+```
 
 ### 1.2 Command file format and meaning:
 
@@ -59,6 +67,7 @@ Execute the command of `go build -o tools/kubeedge/kubeedge tools/kubeedge/main.
 ```
 
 ### 2.3 Get stream1
+
 ```json
 {
     "commands":[

+ 26 - 3
tools/kubeedge/common/conf.go

@@ -2,6 +2,7 @@ package common
 
 import (
 	"bytes"
+	"encoding/json"
 	"fmt"
 	"github.com/go-yaml/yaml"
 	"github.com/sirupsen/logrus"
@@ -148,9 +149,11 @@ func fetchContents(request *http.Request) (data []byte, err error) {
 	if nil != err {
 		return nil, err
 	}
-	if respon.StatusCode < 200 || respon.StatusCode > 299 {
-		return data, fmt.Errorf("http return code: %d and error message %s.", respon.StatusCode, string(data))
-	}
+	/*
+		if respon.StatusCode < 200 || respon.StatusCode > 299 {
+			return data, fmt.Errorf("http return code: %d and error message %s.", respon.StatusCode, string(data))
+		}
+	*/
 	return data, err
 }
 
@@ -178,3 +181,23 @@ func Delete(inUrl string) (data []byte, err error) {
 	}
 	return fetchContents(request)
 }
+
+func LoadFileUnmarshal(path string, ret interface{}) error {
+	sliByte, err := ioutil.ReadFile(path)
+	if nil != err {
+		return err
+	}
+	err = json.Unmarshal(sliByte, ret)
+	if nil != err {
+		return err
+	}
+	return nil
+}
+
+func SaveFileMarshal(path string, content interface{}) error {
+	data, err := json.Marshal(content)
+	if nil != err {
+		return err
+	}
+	return ioutil.WriteFile(path, data, 0666)
+}

+ 1 - 1
tools/kubeedge/conf/cf.yaml

@@ -1,6 +1,6 @@
 port: 9081
 timeout: 500
-intervalTime: 60
+intervalTime: 30
 ip: "127.0.0.1"
 logPath: "./log/kubeedge.log"
 commandDir: "./sample/"

+ 0 - 2
tools/kubeedge/sample/sample.json

@@ -95,5 +95,3 @@
             "method":"get"
         }]
 }
-
-

+ 69 - 23
tools/kubeedge/util/util.go

@@ -7,7 +7,6 @@ import (
 	"io/ioutil"
 	"os"
 	"path"
-	"strconv"
 	"time"
 )
 
@@ -56,18 +55,23 @@ func (this *command) call(host string) bool {
 }
 
 type (
+	historyFile struct {
+		Name     string `json:"name"`
+		LoadTime int64  `json:"loadTime"`
+	}
 	server struct {
-		dirCommand string
-		dirHistory string
-		logs       []string
+		dirCommand     string
+		fileHistory    string
+		mapHistoryFile map[string]*historyFile
+		logs           []string
 	}
 )
 
-func (this *server) setDirCommand(dir string) {
-	this.dirCommand = dir
+func (this *historyFile) setName(name string) {
+	this.Name = name
 }
-func (this *server) setDirHistory(dir string) {
-	this.dirHistory = dir
+func (this *historyFile) setLoadTime(loadTime int64) {
+	this.LoadTime = loadTime
 }
 
 func (this *server) getLogs() []string {
@@ -80,19 +84,59 @@ func (this *server) printLogs() {
 	this.logs = this.logs[:0]
 }
 
+func (this *server) loadHistoryFile() bool {
+	var sli []*historyFile
+	if err := common.LoadFileUnmarshal(this.fileHistory, &sli); nil != err {
+		common.Log.Info(err)
+		return false
+	}
+	for _, v := range sli {
+		this.mapHistoryFile[v.Name] = v
+	}
+	return true
+}
+
 func (this *server) init() bool {
+	this.mapHistoryFile = make(map[string]*historyFile)
 	conf := common.GetConf()
 	dirCommand := conf.GetCommandDir()
-	dirHistory := path.Join(path.Dir(dirCommand), ".history")
-	if err := os.MkdirAll(dirHistory, 0755); nil != err {
-		this.logs = append(this.logs, fmt.Sprintf("mkdir history dir:%v", err))
+	this.dirCommand = dirCommand
+	this.fileHistory = path.Join(path.Dir(dirCommand), ".history")
+	if _, err := os.Stat(this.fileHistory); os.IsNotExist(err) {
+		if _, err = os.Create(this.fileHistory); nil != err {
+			common.Log.Info(err)
+			return false
+		}
+		return true
+	}
+	return this.loadHistoryFile()
+}
+
+func (this *server) saveHistoryFile() bool {
+	var sli []*historyFile
+	for _, v := range this.mapHistoryFile {
+		sli = append(sli, v)
+	}
+	err := common.SaveFileMarshal(this.fileHistory, sli)
+	if nil != err {
+		common.Log.Info(err)
 		return false
 	}
-	this.dirCommand = dirCommand
-	this.dirHistory = dirHistory
 	return true
 }
 
+func (this *server) isUpdate(info os.FileInfo) bool {
+	v := this.mapHistoryFile[info.Name()]
+	if nil == v {
+		return true
+	}
+
+	if v.LoadTime < info.ModTime().Unix() {
+		return true
+	}
+	return false
+}
+
 func (this *server) processDir() bool {
 	infos, err := ioutil.ReadDir(this.dirCommand)
 	if nil != err {
@@ -102,30 +146,32 @@ func (this *server) processDir() bool {
 	conf := common.GetConf()
 	host := fmt.Sprintf(`http://%s:%d`, conf.GetIp(), conf.GetPort())
 	for _, info := range infos {
+		if !this.isUpdate(info) {
+			continue
+		}
+
+		hisFile := new(historyFile)
+		hisFile.setName(info.Name())
+		hisFile.setLoadTime(time.Now().Unix())
+		this.mapHistoryFile[info.Name()] = hisFile
+
 		filePath := path.Join(this.dirCommand, info.Name())
 		file := new(fileData)
-		sliByte, err := ioutil.ReadFile(filePath)
+		err = common.LoadFileUnmarshal(filePath, file)
 		if nil != err {
 			this.logs = append(this.logs, fmt.Sprintf("load command file:%v", err))
 			return false
 		}
-		err = json.Unmarshal(sliByte, file)
-		if nil != err {
-			this.logs = append(this.logs, fmt.Sprintf("unmarshal command file:%v", err))
-			return false
-		}
 
 		for _, command := range file.Commands {
 			flag := command.call(host)
 			this.logs = append(this.logs, command.getLog())
 			if !flag {
-				return false
+				break
 			}
 		}
-		newFileName := info.Name() + "_" + strconv.FormatInt(time.Now().Unix(), 10)
-		newFilePath := path.Join(this.dirHistory, newFileName)
-		os.Rename(filePath, newFilePath)
 	}
+	this.saveHistoryFile()
 	return true
 }