util.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package util
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "net/http"
  19. "os"
  20. "path"
  21. "strings"
  22. "time"
  23. kconf "github.com/lf-edge/ekuiper/tools/kubernetes/conf"
  24. )
  25. type (
  26. command struct {
  27. Url string `json:"url"`
  28. Description string `json:"description"`
  29. Method string `json:"method"`
  30. Data interface{} `json:"data"`
  31. strLog string
  32. }
  33. fileData struct {
  34. Commands []*command `json:"commands"`
  35. }
  36. )
  37. func (c *command) getLog() string {
  38. return c.strLog
  39. }
  40. func (c *command) call(host string) bool {
  41. var resp []byte
  42. var err error
  43. head := host + c.Url
  44. body, _ := json.Marshal(c.Data)
  45. switch strings.ToUpper(c.Method) {
  46. case http.MethodPost:
  47. resp, err = kconf.Post(head, string(body))
  48. break
  49. case http.MethodGet:
  50. resp, err = kconf.Get(head)
  51. break
  52. case http.MethodDelete:
  53. resp, err = kconf.Delete(head)
  54. break
  55. case http.MethodPut:
  56. resp, err = kconf.Put(head, string(body))
  57. break
  58. default:
  59. c.strLog = fmt.Sprintf("no such method : %s", c.Method)
  60. return false
  61. }
  62. if nil == err {
  63. c.strLog = fmt.Sprintf("%s:%s resp:%s", head, c.Method, string(resp))
  64. return true
  65. }
  66. c.strLog = fmt.Sprintf("%s:%s resp:%s err:%v", head, c.Method, string(resp), err)
  67. return false
  68. }
  69. type (
  70. historyFile struct {
  71. Name string `json:"name"`
  72. LoadTime int64 `json:"loadTime"`
  73. }
  74. server struct {
  75. dirCommand string
  76. fileHistory string
  77. mapHistoryFile map[string]*historyFile
  78. logs []string
  79. }
  80. )
  81. func (f *historyFile) setName(name string) {
  82. f.Name = name
  83. }
  84. func (f *historyFile) setLoadTime(loadTime int64) {
  85. f.LoadTime = loadTime
  86. }
  87. func (s *server) getLogs() []string {
  88. return s.logs
  89. }
  90. func (s *server) printLogs() {
  91. for _, v := range s.logs {
  92. kconf.Log.Info(v)
  93. }
  94. s.logs = s.logs[:0]
  95. }
  96. func (s *server) loadHistoryFile() bool {
  97. var sli []*historyFile
  98. if err := kconf.LoadFileUnmarshal(s.fileHistory, &sli); nil != err {
  99. kconf.Log.Info(err)
  100. return false
  101. }
  102. for _, v := range sli {
  103. s.mapHistoryFile[v.Name] = v
  104. }
  105. return true
  106. }
  107. func (s *server) init() bool {
  108. s.mapHistoryFile = make(map[string]*historyFile)
  109. conf := kconf.GetConf()
  110. dirCommand := conf.GetCommandDir()
  111. s.dirCommand = dirCommand
  112. s.fileHistory = path.Join(path.Dir(dirCommand), ".history")
  113. if _, err := os.Stat(s.fileHistory); os.IsNotExist(err) {
  114. if _, err = os.Create(s.fileHistory); nil != err {
  115. kconf.Log.Info(err)
  116. return false
  117. }
  118. return true
  119. }
  120. return s.loadHistoryFile()
  121. }
  122. func (s *server) saveHistoryFile() bool {
  123. var sli []*historyFile
  124. for _, v := range s.mapHistoryFile {
  125. sli = append(sli, v)
  126. }
  127. err := kconf.SaveFileMarshal(s.fileHistory, sli)
  128. if nil != err {
  129. kconf.Log.Info(err)
  130. return false
  131. }
  132. return true
  133. }
  134. func (s *server) isUpdate(entry os.DirEntry) bool {
  135. v := s.mapHistoryFile[entry.Name()]
  136. if nil == v {
  137. return true
  138. }
  139. info, err := entry.Info()
  140. if err != nil {
  141. return false
  142. }
  143. if v.LoadTime < info.ModTime().Unix() {
  144. return true
  145. }
  146. return false
  147. }
  148. func (s *server) processDir() bool {
  149. dirEntries, err := os.ReadDir(s.dirCommand)
  150. if nil != err {
  151. s.logs = append(s.logs, fmt.Sprintf("read command dir:%v", err))
  152. return false
  153. }
  154. conf := kconf.GetConf()
  155. host := fmt.Sprintf(`http://%s:%d`, conf.GetIp(), conf.GetPort())
  156. for _, entry := range dirEntries {
  157. if !strings.HasSuffix(entry.Name(), ".json") {
  158. continue
  159. }
  160. if !s.isUpdate(entry) {
  161. continue
  162. }
  163. hisFile := new(historyFile)
  164. hisFile.setName(entry.Name())
  165. hisFile.setLoadTime(time.Now().Unix())
  166. s.mapHistoryFile[entry.Name()] = hisFile
  167. filePath := path.Join(s.dirCommand, entry.Name())
  168. file := new(fileData)
  169. err = kconf.LoadFileUnmarshal(filePath, file)
  170. if nil != err {
  171. s.logs = append(s.logs, fmt.Sprintf("load command file:%v", err))
  172. return false
  173. }
  174. for _, command := range file.Commands {
  175. flag := command.call(host)
  176. s.logs = append(s.logs, command.getLog())
  177. if !flag {
  178. break
  179. }
  180. }
  181. }
  182. s.saveHistoryFile()
  183. return true
  184. }
  185. func (s *server) watchFolders() {
  186. conf := kconf.GetConf()
  187. s.processDir()
  188. s.printLogs()
  189. chTimer := time.NewTicker(time.Second * time.Duration(conf.GetIntervalTime()))
  190. defer chTimer.Stop()
  191. for {
  192. select {
  193. case <-chTimer.C:
  194. s.processDir()
  195. s.printLogs()
  196. }
  197. }
  198. }
  199. func Process() {
  200. if len(os.Args) != 2 {
  201. fmt.Println("Missing configuration file")
  202. return
  203. }
  204. conf := kconf.GetConf()
  205. if !conf.Init() {
  206. return
  207. }
  208. se := new(server)
  209. if !se.init() {
  210. se.printLogs()
  211. return
  212. }
  213. fmt.Println("Kuiper kubernetes tool is started successfully!")
  214. se.watchFolders()
  215. }