util.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package util
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path"
  8. "strings"
  9. "time"
  10. "github.com/emqx/kuiper/tools/kubernetes/common"
  11. )
  12. type (
  13. command struct {
  14. Url string `json:"url"`
  15. Description string `json:"description"`
  16. Method string `json:"method"`
  17. Data interface{} `json:"data"`
  18. strLog string
  19. }
  20. fileData struct {
  21. Commands []*command `json:"commands"`
  22. }
  23. )
  24. func (this *command) getLog() string {
  25. return this.strLog
  26. }
  27. func (this *command) call(host string) bool {
  28. var resp []byte
  29. var err error
  30. head := host + this.Url
  31. body, _ := json.Marshal(this.Data)
  32. switch this.Method {
  33. case "post", "POST":
  34. resp, err = common.Post(head, string(body))
  35. break
  36. case "get", "GET":
  37. resp, err = common.Get(head)
  38. break
  39. case "delete", "DELETE":
  40. resp, err = common.Delete(head)
  41. break
  42. case "put", "PUT":
  43. resp, err = common.Put(head, string(body))
  44. break
  45. default:
  46. this.strLog = fmt.Sprintf("no such method : %s", this.Method)
  47. return false
  48. }
  49. if nil == err {
  50. this.strLog = fmt.Sprintf("%s:%s resp:%s", head, this.Method, string(resp))
  51. return true
  52. }
  53. this.strLog = fmt.Sprintf("%s:%s resp:%s err:%v", head, this.Method, string(resp), err)
  54. return false
  55. }
  56. type (
  57. historyFile struct {
  58. Name string `json:"name"`
  59. LoadTime int64 `json:"loadTime"`
  60. }
  61. server struct {
  62. dirCommand string
  63. fileHistory string
  64. mapHistoryFile map[string]*historyFile
  65. logs []string
  66. }
  67. )
  68. func (this *historyFile) setName(name string) {
  69. this.Name = name
  70. }
  71. func (this *historyFile) setLoadTime(loadTime int64) {
  72. this.LoadTime = loadTime
  73. }
  74. func (this *server) getLogs() []string {
  75. return this.logs
  76. }
  77. func (this *server) printLogs() {
  78. for _, v := range this.logs {
  79. common.Log.Info(v)
  80. }
  81. this.logs = this.logs[:0]
  82. }
  83. func (this *server) loadHistoryFile() bool {
  84. var sli []*historyFile
  85. if err := common.LoadFileUnmarshal(this.fileHistory, &sli); nil != err {
  86. common.Log.Info(err)
  87. return false
  88. }
  89. for _, v := range sli {
  90. this.mapHistoryFile[v.Name] = v
  91. }
  92. return true
  93. }
  94. func (this *server) init() bool {
  95. this.mapHistoryFile = make(map[string]*historyFile)
  96. conf := common.GetConf()
  97. dirCommand := conf.GetCommandDir()
  98. this.dirCommand = dirCommand
  99. this.fileHistory = path.Join(path.Dir(dirCommand), ".history")
  100. if _, err := os.Stat(this.fileHistory); os.IsNotExist(err) {
  101. if _, err = os.Create(this.fileHistory); nil != err {
  102. common.Log.Info(err)
  103. return false
  104. }
  105. return true
  106. }
  107. return this.loadHistoryFile()
  108. }
  109. func (this *server) saveHistoryFile() bool {
  110. var sli []*historyFile
  111. for _, v := range this.mapHistoryFile {
  112. sli = append(sli, v)
  113. }
  114. err := common.SaveFileMarshal(this.fileHistory, sli)
  115. if nil != err {
  116. common.Log.Info(err)
  117. return false
  118. }
  119. return true
  120. }
  121. func (this *server) isUpdate(info os.FileInfo) bool {
  122. v := this.mapHistoryFile[info.Name()]
  123. if nil == v {
  124. return true
  125. }
  126. if v.LoadTime < info.ModTime().Unix() {
  127. return true
  128. }
  129. return false
  130. }
  131. func (this *server) processDir() bool {
  132. infos, err := ioutil.ReadDir(this.dirCommand)
  133. if nil != err {
  134. this.logs = append(this.logs, fmt.Sprintf("read command dir:%v", err))
  135. return false
  136. }
  137. conf := common.GetConf()
  138. host := fmt.Sprintf(`http://%s:%d`, conf.GetIp(), conf.GetPort())
  139. for _, info := range infos {
  140. if !strings.HasSuffix(info.Name(), ".json") {
  141. continue
  142. }
  143. if !this.isUpdate(info) {
  144. continue
  145. }
  146. hisFile := new(historyFile)
  147. hisFile.setName(info.Name())
  148. hisFile.setLoadTime(time.Now().Unix())
  149. this.mapHistoryFile[info.Name()] = hisFile
  150. filePath := path.Join(this.dirCommand, info.Name())
  151. file := new(fileData)
  152. err = common.LoadFileUnmarshal(filePath, file)
  153. if nil != err {
  154. this.logs = append(this.logs, fmt.Sprintf("load command file:%v", err))
  155. return false
  156. }
  157. for _, command := range file.Commands {
  158. flag := command.call(host)
  159. this.logs = append(this.logs, command.getLog())
  160. if !flag {
  161. break
  162. }
  163. }
  164. }
  165. this.saveHistoryFile()
  166. return true
  167. }
  168. func (this *server) watchFolders() {
  169. conf := common.GetConf()
  170. this.processDir()
  171. this.printLogs()
  172. chTime := time.Tick(time.Second * time.Duration(conf.GetIntervalTime()))
  173. for {
  174. select {
  175. case <-chTime:
  176. this.processDir()
  177. this.printLogs()
  178. }
  179. }
  180. }
  181. func Process() {
  182. if len(os.Args) != 2 {
  183. common.Log.Fatal("Missing configuration file")
  184. return
  185. }
  186. conf := common.GetConf()
  187. if !conf.Init() {
  188. return
  189. }
  190. se := new(server)
  191. if !se.init() {
  192. se.printLogs()
  193. return
  194. }
  195. fmt.Println("Kuiper kubernetes tool is started successfully!")
  196. se.watchFolders()
  197. }