util.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package util
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path"
  8. "strings"
  9. "time"
  10. "github.com/emqx/kuiper/tools/kubernetes/common"
  11. )
  12. type (
  13. command struct {
  14. Url string `json:"url"`
  15. Description string `json:"description"`
  16. Method string `json:"method"`
  17. Data interface{} `json:"data"`
  18. strLog string
  19. }
  20. fileData struct {
  21. Commands []*command `json:"commands"`
  22. }
  23. )
  24. func (this *command) getLog() string {
  25. return this.strLog
  26. }
  27. func (this *command) call(host string) bool {
  28. var resp []byte
  29. var err error
  30. head := host + this.Url
  31. body, _ := json.Marshal(this.Data)
  32. switch this.Method {
  33. case "post", "POST":
  34. resp, err = common.Post(head, string(body))
  35. break
  36. case "get", "GET":
  37. resp, err = common.Get(head)
  38. break
  39. case "delete", "DELETE":
  40. resp, err = common.Delete(head)
  41. break
  42. default:
  43. this.strLog = fmt.Sprintf("no such method : %s", this.Method)
  44. return false
  45. }
  46. if nil == err {
  47. this.strLog = fmt.Sprintf("%s:%s resp:%s", head, this.Method, string(resp))
  48. return true
  49. }
  50. this.strLog = fmt.Sprintf("%s:%s resp:%s err:%v", head, this.Method, string(resp), err)
  51. return false
  52. }
  53. type (
  54. historyFile struct {
  55. Name string `json:"name"`
  56. LoadTime int64 `json:"loadTime"`
  57. }
  58. server struct {
  59. dirCommand string
  60. fileHistory string
  61. mapHistoryFile map[string]*historyFile
  62. logs []string
  63. }
  64. )
  65. func (this *historyFile) setName(name string) {
  66. this.Name = name
  67. }
  68. func (this *historyFile) setLoadTime(loadTime int64) {
  69. this.LoadTime = loadTime
  70. }
  71. func (this *server) getLogs() []string {
  72. return this.logs
  73. }
  74. func (this *server) printLogs() {
  75. for _, v := range this.logs {
  76. common.Log.Info(v)
  77. }
  78. this.logs = this.logs[:0]
  79. }
  80. func (this *server) loadHistoryFile() bool {
  81. var sli []*historyFile
  82. if err := common.LoadFileUnmarshal(this.fileHistory, &sli); nil != err {
  83. common.Log.Info(err)
  84. return false
  85. }
  86. for _, v := range sli {
  87. this.mapHistoryFile[v.Name] = v
  88. }
  89. return true
  90. }
  91. func (this *server) init() bool {
  92. this.mapHistoryFile = make(map[string]*historyFile)
  93. conf := common.GetConf()
  94. dirCommand := conf.GetCommandDir()
  95. this.dirCommand = dirCommand
  96. this.fileHistory = path.Join(path.Dir(dirCommand), ".history")
  97. if _, err := os.Stat(this.fileHistory); os.IsNotExist(err) {
  98. if _, err = os.Create(this.fileHistory); nil != err {
  99. common.Log.Info(err)
  100. return false
  101. }
  102. return true
  103. }
  104. return this.loadHistoryFile()
  105. }
  106. func (this *server) saveHistoryFile() bool {
  107. var sli []*historyFile
  108. for _, v := range this.mapHistoryFile {
  109. sli = append(sli, v)
  110. }
  111. err := common.SaveFileMarshal(this.fileHistory, sli)
  112. if nil != err {
  113. common.Log.Info(err)
  114. return false
  115. }
  116. return true
  117. }
  118. func (this *server) isUpdate(info os.FileInfo) bool {
  119. v := this.mapHistoryFile[info.Name()]
  120. if nil == v {
  121. return true
  122. }
  123. if v.LoadTime < info.ModTime().Unix() {
  124. return true
  125. }
  126. return false
  127. }
  128. func (this *server) processDir() bool {
  129. infos, err := ioutil.ReadDir(this.dirCommand)
  130. if nil != err {
  131. this.logs = append(this.logs, fmt.Sprintf("read command dir:%v", err))
  132. return false
  133. }
  134. conf := common.GetConf()
  135. host := fmt.Sprintf(`http://%s:%d`, conf.GetIp(), conf.GetPort())
  136. for _, info := range infos {
  137. if !strings.HasSuffix(info.Name(), ".json") {
  138. continue
  139. }
  140. if !this.isUpdate(info) {
  141. continue
  142. }
  143. hisFile := new(historyFile)
  144. hisFile.setName(info.Name())
  145. hisFile.setLoadTime(time.Now().Unix())
  146. this.mapHistoryFile[info.Name()] = hisFile
  147. filePath := path.Join(this.dirCommand, info.Name())
  148. file := new(fileData)
  149. err = common.LoadFileUnmarshal(filePath, file)
  150. if nil != err {
  151. this.logs = append(this.logs, fmt.Sprintf("load command file:%v", err))
  152. return false
  153. }
  154. for _, command := range file.Commands {
  155. flag := command.call(host)
  156. this.logs = append(this.logs, command.getLog())
  157. if !flag {
  158. break
  159. }
  160. }
  161. }
  162. this.saveHistoryFile()
  163. return true
  164. }
  165. func (this *server) watchFolders() {
  166. conf := common.GetConf()
  167. this.processDir()
  168. this.printLogs()
  169. chTime := time.Tick(time.Second * time.Duration(conf.GetIntervalTime()))
  170. for {
  171. select {
  172. case <-chTime:
  173. this.processDir()
  174. this.printLogs()
  175. }
  176. }
  177. }
  178. func Process() {
  179. if len(os.Args) != 2 {
  180. common.Log.Fatal("Missing configuration file")
  181. return
  182. }
  183. conf := common.GetConf()
  184. if !conf.Init() {
  185. return
  186. }
  187. se := new(server)
  188. if !se.init() {
  189. se.printLogs()
  190. return
  191. }
  192. fmt.Println("Kuiper kubernetes tool is started successfully!")
  193. se.watchFolders()
  194. }