util.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package util
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "os"
  19. "path"
  20. "strings"
  21. "time"
  22. kconf "github.com/lf-edge/ekuiper/tools/kubernetes/conf"
  23. )
  24. type (
  25. command struct {
  26. Url string `json:"url"`
  27. Description string `json:"description"`
  28. Method string `json:"method"`
  29. Data interface{} `json:"data"`
  30. strLog string
  31. }
  32. fileData struct {
  33. Commands []*command `json:"commands"`
  34. }
  35. )
  36. func (c *command) getLog() string {
  37. return c.strLog
  38. }
  39. func (c *command) call(host string) bool {
  40. var resp []byte
  41. var err error
  42. head := host + c.Url
  43. body, _ := json.Marshal(c.Data)
  44. switch c.Method {
  45. case "post", "POST":
  46. resp, err = kconf.Post(head, string(body))
  47. break
  48. case "get", "GET":
  49. resp, err = kconf.Get(head)
  50. break
  51. case "delete", "DELETE":
  52. resp, err = kconf.Delete(head)
  53. break
  54. case "put", "PUT":
  55. resp, err = kconf.Put(head, string(body))
  56. break
  57. default:
  58. c.strLog = fmt.Sprintf("no such method : %s", c.Method)
  59. return false
  60. }
  61. if nil == err {
  62. c.strLog = fmt.Sprintf("%s:%s resp:%s", head, c.Method, string(resp))
  63. return true
  64. }
  65. c.strLog = fmt.Sprintf("%s:%s resp:%s err:%v", head, c.Method, string(resp), err)
  66. return false
  67. }
  68. type (
  69. historyFile struct {
  70. Name string `json:"name"`
  71. LoadTime int64 `json:"loadTime"`
  72. }
  73. server struct {
  74. dirCommand string
  75. fileHistory string
  76. mapHistoryFile map[string]*historyFile
  77. logs []string
  78. }
  79. )
  80. func (f *historyFile) setName(name string) {
  81. f.Name = name
  82. }
  83. func (f *historyFile) setLoadTime(loadTime int64) {
  84. f.LoadTime = loadTime
  85. }
  86. func (s *server) getLogs() []string {
  87. return s.logs
  88. }
  89. func (s *server) printLogs() {
  90. for _, v := range s.logs {
  91. kconf.Log.Info(v)
  92. }
  93. s.logs = s.logs[:0]
  94. }
  95. func (s *server) loadHistoryFile() bool {
  96. var sli []*historyFile
  97. if err := kconf.LoadFileUnmarshal(s.fileHistory, &sli); nil != err {
  98. kconf.Log.Info(err)
  99. return false
  100. }
  101. for _, v := range sli {
  102. s.mapHistoryFile[v.Name] = v
  103. }
  104. return true
  105. }
  106. func (s *server) init() bool {
  107. s.mapHistoryFile = make(map[string]*historyFile)
  108. conf := kconf.GetConf()
  109. dirCommand := conf.GetCommandDir()
  110. s.dirCommand = dirCommand
  111. s.fileHistory = path.Join(path.Dir(dirCommand), ".history")
  112. if _, err := os.Stat(s.fileHistory); os.IsNotExist(err) {
  113. if _, err = os.Create(s.fileHistory); nil != err {
  114. kconf.Log.Info(err)
  115. return false
  116. }
  117. return true
  118. }
  119. return s.loadHistoryFile()
  120. }
  121. func (s *server) saveHistoryFile() bool {
  122. var sli []*historyFile
  123. for _, v := range s.mapHistoryFile {
  124. sli = append(sli, v)
  125. }
  126. err := kconf.SaveFileMarshal(s.fileHistory, sli)
  127. if nil != err {
  128. kconf.Log.Info(err)
  129. return false
  130. }
  131. return true
  132. }
  133. func (s *server) isUpdate(entry os.DirEntry) bool {
  134. v := s.mapHistoryFile[entry.Name()]
  135. if nil == v {
  136. return true
  137. }
  138. info, err := entry.Info()
  139. if err != nil {
  140. return false
  141. }
  142. if v.LoadTime < info.ModTime().Unix() {
  143. return true
  144. }
  145. return false
  146. }
  147. func (s *server) processDir() bool {
  148. dirEntries, err := os.ReadDir(s.dirCommand)
  149. if nil != err {
  150. s.logs = append(s.logs, fmt.Sprintf("read command dir:%v", err))
  151. return false
  152. }
  153. conf := kconf.GetConf()
  154. host := fmt.Sprintf(`http://%s:%d`, conf.GetIp(), conf.GetPort())
  155. for _, entry := range dirEntries {
  156. if !strings.HasSuffix(entry.Name(), ".json") {
  157. continue
  158. }
  159. if !s.isUpdate(entry) {
  160. continue
  161. }
  162. hisFile := new(historyFile)
  163. hisFile.setName(entry.Name())
  164. hisFile.setLoadTime(time.Now().Unix())
  165. s.mapHistoryFile[entry.Name()] = hisFile
  166. filePath := path.Join(s.dirCommand, entry.Name())
  167. file := new(fileData)
  168. err = kconf.LoadFileUnmarshal(filePath, file)
  169. if nil != err {
  170. s.logs = append(s.logs, fmt.Sprintf("load command file:%v", err))
  171. return false
  172. }
  173. for _, command := range file.Commands {
  174. flag := command.call(host)
  175. s.logs = append(s.logs, command.getLog())
  176. if !flag {
  177. break
  178. }
  179. }
  180. }
  181. s.saveHistoryFile()
  182. return true
  183. }
  184. func (s *server) watchFolders() {
  185. conf := kconf.GetConf()
  186. s.processDir()
  187. s.printLogs()
  188. chTime := time.Tick(time.Second * time.Duration(conf.GetIntervalTime()))
  189. for {
  190. select {
  191. case <-chTime:
  192. s.processDir()
  193. s.printLogs()
  194. }
  195. }
  196. }
  197. func Process() {
  198. if len(os.Args) != 2 {
  199. fmt.Println("Missing configuration file")
  200. return
  201. }
  202. conf := kconf.GetConf()
  203. if !conf.Init() {
  204. return
  205. }
  206. se := new(server)
  207. if !se.init() {
  208. se.printLogs()
  209. return
  210. }
  211. fmt.Println("Kuiper kubernetes tool is started successfully!")
  212. se.watchFolders()
  213. }