util.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package util
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. kconf "github.com/lf-edge/ekuiper/tools/kubernetes/conf"
  19. "io/ioutil"
  20. "os"
  21. "path"
  22. "strings"
  23. "time"
  24. )
  25. type (
  26. command struct {
  27. Url string `json:"url"`
  28. Description string `json:"description"`
  29. Method string `json:"method"`
  30. Data interface{} `json:"data"`
  31. strLog string
  32. }
  33. fileData struct {
  34. Commands []*command `json:"commands"`
  35. }
  36. )
  37. func (c *command) getLog() string {
  38. return c.strLog
  39. }
  40. func (c *command) call(host string) bool {
  41. var resp []byte
  42. var err error
  43. head := host + c.Url
  44. body, _ := json.Marshal(c.Data)
  45. switch c.Method {
  46. case "post", "POST":
  47. resp, err = kconf.Post(head, string(body))
  48. break
  49. case "get", "GET":
  50. resp, err = kconf.Get(head)
  51. break
  52. case "delete", "DELETE":
  53. resp, err = kconf.Delete(head)
  54. break
  55. case "put", "PUT":
  56. resp, err = kconf.Put(head, string(body))
  57. break
  58. default:
  59. c.strLog = fmt.Sprintf("no such method : %s", c.Method)
  60. return false
  61. }
  62. if nil == err {
  63. c.strLog = fmt.Sprintf("%s:%s resp:%s", head, c.Method, string(resp))
  64. return true
  65. }
  66. c.strLog = fmt.Sprintf("%s:%s resp:%s err:%v", head, c.Method, string(resp), err)
  67. return false
  68. }
  69. type (
  70. historyFile struct {
  71. Name string `json:"name"`
  72. LoadTime int64 `json:"loadTime"`
  73. }
  74. server struct {
  75. dirCommand string
  76. fileHistory string
  77. mapHistoryFile map[string]*historyFile
  78. logs []string
  79. }
  80. )
  81. func (f *historyFile) setName(name string) {
  82. f.Name = name
  83. }
  84. func (f *historyFile) setLoadTime(loadTime int64) {
  85. f.LoadTime = loadTime
  86. }
  87. func (s *server) getLogs() []string {
  88. return s.logs
  89. }
  90. func (s *server) printLogs() {
  91. for _, v := range s.logs {
  92. kconf.Log.Info(v)
  93. }
  94. s.logs = s.logs[:0]
  95. }
  96. func (s *server) loadHistoryFile() bool {
  97. var sli []*historyFile
  98. if err := kconf.LoadFileUnmarshal(s.fileHistory, &sli); nil != err {
  99. kconf.Log.Info(err)
  100. return false
  101. }
  102. for _, v := range sli {
  103. s.mapHistoryFile[v.Name] = v
  104. }
  105. return true
  106. }
  107. func (s *server) init() bool {
  108. s.mapHistoryFile = make(map[string]*historyFile)
  109. conf := kconf.GetConf()
  110. dirCommand := conf.GetCommandDir()
  111. s.dirCommand = dirCommand
  112. s.fileHistory = path.Join(path.Dir(dirCommand), ".history")
  113. if _, err := os.Stat(s.fileHistory); os.IsNotExist(err) {
  114. if _, err = os.Create(s.fileHistory); nil != err {
  115. kconf.Log.Info(err)
  116. return false
  117. }
  118. return true
  119. }
  120. return s.loadHistoryFile()
  121. }
  122. func (s *server) saveHistoryFile() bool {
  123. var sli []*historyFile
  124. for _, v := range s.mapHistoryFile {
  125. sli = append(sli, v)
  126. }
  127. err := kconf.SaveFileMarshal(s.fileHistory, sli)
  128. if nil != err {
  129. kconf.Log.Info(err)
  130. return false
  131. }
  132. return true
  133. }
  134. func (s *server) isUpdate(info os.FileInfo) bool {
  135. v := s.mapHistoryFile[info.Name()]
  136. if nil == v {
  137. return true
  138. }
  139. if v.LoadTime < info.ModTime().Unix() {
  140. return true
  141. }
  142. return false
  143. }
  144. func (s *server) processDir() bool {
  145. infos, err := ioutil.ReadDir(s.dirCommand)
  146. if nil != err {
  147. s.logs = append(s.logs, fmt.Sprintf("read command dir:%v", err))
  148. return false
  149. }
  150. conf := kconf.GetConf()
  151. host := fmt.Sprintf(`http://%s:%d`, conf.GetIp(), conf.GetPort())
  152. for _, info := range infos {
  153. if !strings.HasSuffix(info.Name(), ".json") {
  154. continue
  155. }
  156. if !s.isUpdate(info) {
  157. continue
  158. }
  159. hisFile := new(historyFile)
  160. hisFile.setName(info.Name())
  161. hisFile.setLoadTime(time.Now().Unix())
  162. s.mapHistoryFile[info.Name()] = hisFile
  163. filePath := path.Join(s.dirCommand, info.Name())
  164. file := new(fileData)
  165. err = kconf.LoadFileUnmarshal(filePath, file)
  166. if nil != err {
  167. s.logs = append(s.logs, fmt.Sprintf("load command file:%v", err))
  168. return false
  169. }
  170. for _, command := range file.Commands {
  171. flag := command.call(host)
  172. s.logs = append(s.logs, command.getLog())
  173. if !flag {
  174. break
  175. }
  176. }
  177. }
  178. s.saveHistoryFile()
  179. return true
  180. }
  181. func (s *server) watchFolders() {
  182. conf := kconf.GetConf()
  183. s.processDir()
  184. s.printLogs()
  185. chTime := time.Tick(time.Second * time.Duration(conf.GetIntervalTime()))
  186. for {
  187. select {
  188. case <-chTime:
  189. s.processDir()
  190. s.printLogs()
  191. }
  192. }
  193. }
  194. func Process() {
  195. if len(os.Args) != 2 {
  196. kconf.Log.Fatal("Missing configuration file")
  197. return
  198. }
  199. conf := kconf.GetConf()
  200. if !conf.Init() {
  201. return
  202. }
  203. se := new(server)
  204. if !se.init() {
  205. se.printLogs()
  206. return
  207. }
  208. fmt.Println("Kuiper kubernetes tool is started successfully!")
  209. se.watchFolders()
  210. }