util.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package util
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "net/http"
  19. "os"
  20. "path"
  21. "strings"
  22. "time"
  23. "github.com/lf-edge/ekuiper/pkg/cast"
  24. kconf "github.com/lf-edge/ekuiper/tools/kubernetes/conf"
  25. )
  26. type (
  27. command struct {
  28. Url string `json:"url"`
  29. Description string `json:"description"`
  30. Method string `json:"method"`
  31. Data interface{} `json:"data"`
  32. strLog string
  33. }
  34. fileData struct {
  35. Commands []*command `json:"commands"`
  36. }
  37. )
  38. func (c *command) getLog() string {
  39. return c.strLog
  40. }
  41. func (c *command) call(host string) bool {
  42. var resp []byte
  43. var err error
  44. head := host + c.Url
  45. body, _ := json.Marshal(c.Data)
  46. switch strings.ToUpper(c.Method) {
  47. case http.MethodPost:
  48. resp, err = kconf.Post(head, string(body))
  49. break
  50. case http.MethodGet:
  51. resp, err = kconf.Get(head)
  52. break
  53. case http.MethodDelete:
  54. resp, err = kconf.Delete(head)
  55. break
  56. case http.MethodPut:
  57. resp, err = kconf.Put(head, string(body))
  58. break
  59. default:
  60. c.strLog = fmt.Sprintf("no such method : %s", c.Method)
  61. return false
  62. }
  63. if nil == err {
  64. c.strLog = fmt.Sprintf("%s:%s resp:%s", head, c.Method, string(resp))
  65. return true
  66. }
  67. c.strLog = fmt.Sprintf("%s:%s resp:%s err:%v", head, c.Method, string(resp), err)
  68. return false
  69. }
  70. type (
  71. historyFile struct {
  72. Name string `json:"name"`
  73. LoadTime int64 `json:"loadTime"`
  74. }
  75. server struct {
  76. dirCommand string
  77. fileHistory string
  78. mapHistoryFile map[string]*historyFile
  79. logs []string
  80. }
  81. )
  82. func (f *historyFile) setName(name string) {
  83. f.Name = name
  84. }
  85. func (f *historyFile) setLoadTime(loadTime int64) {
  86. f.LoadTime = loadTime
  87. }
  88. func (s *server) getLogs() []string {
  89. return s.logs
  90. }
  91. func (s *server) printLogs() {
  92. for _, v := range s.logs {
  93. kconf.Log.Info(v)
  94. }
  95. s.logs = s.logs[:0]
  96. }
  97. func (s *server) loadHistoryFile() bool {
  98. var sli []*historyFile
  99. if err := kconf.LoadFileUnmarshal(s.fileHistory, &sli); nil != err {
  100. kconf.Log.Info(err)
  101. return false
  102. }
  103. for _, v := range sli {
  104. s.mapHistoryFile[v.Name] = v
  105. }
  106. return true
  107. }
  108. func (s *server) init() bool {
  109. s.mapHistoryFile = make(map[string]*historyFile)
  110. conf := kconf.GetConf()
  111. dirCommand := conf.GetCommandDir()
  112. s.dirCommand = dirCommand
  113. s.fileHistory = path.Join(path.Dir(dirCommand), ".history")
  114. if _, err := os.Stat(s.fileHistory); os.IsNotExist(err) {
  115. if _, err = os.Create(s.fileHistory); nil != err {
  116. kconf.Log.Info(err)
  117. return false
  118. }
  119. return true
  120. }
  121. return s.loadHistoryFile()
  122. }
  123. func (s *server) saveHistoryFile() bool {
  124. var sli []*historyFile
  125. for _, v := range s.mapHistoryFile {
  126. sli = append(sli, v)
  127. }
  128. err := kconf.SaveFileMarshal(s.fileHistory, sli)
  129. if nil != err {
  130. kconf.Log.Info(err)
  131. return false
  132. }
  133. return true
  134. }
  135. func (s *server) isUpdate(entry os.DirEntry) bool {
  136. v := s.mapHistoryFile[entry.Name()]
  137. if nil == v {
  138. return true
  139. }
  140. info, err := entry.Info()
  141. if err != nil {
  142. return false
  143. }
  144. if v.LoadTime < info.ModTime().Unix() {
  145. return true
  146. }
  147. return false
  148. }
  149. func (s *server) processDir() bool {
  150. dirEntries, err := os.ReadDir(s.dirCommand)
  151. if nil != err {
  152. s.logs = append(s.logs, fmt.Sprintf("read command dir:%v", err))
  153. return false
  154. }
  155. conf := kconf.GetConf()
  156. host := "http://" + cast.JoinHostPortInt(conf.GetIp(), conf.GetPort())
  157. for _, entry := range dirEntries {
  158. if !strings.HasSuffix(entry.Name(), ".json") {
  159. continue
  160. }
  161. if !s.isUpdate(entry) {
  162. continue
  163. }
  164. hisFile := new(historyFile)
  165. hisFile.setName(entry.Name())
  166. hisFile.setLoadTime(time.Now().Unix())
  167. s.mapHistoryFile[entry.Name()] = hisFile
  168. filePath := path.Join(s.dirCommand, entry.Name())
  169. file := new(fileData)
  170. err = kconf.LoadFileUnmarshal(filePath, file)
  171. if nil != err {
  172. s.logs = append(s.logs, fmt.Sprintf("load command file:%v", err))
  173. return false
  174. }
  175. for _, command := range file.Commands {
  176. flag := command.call(host)
  177. s.logs = append(s.logs, command.getLog())
  178. if !flag {
  179. break
  180. }
  181. }
  182. }
  183. s.saveHistoryFile()
  184. return true
  185. }
  186. func (s *server) watchFolders() {
  187. conf := kconf.GetConf()
  188. s.processDir()
  189. s.printLogs()
  190. chTimer := time.NewTicker(time.Second * time.Duration(conf.GetIntervalTime()))
  191. defer chTimer.Stop()
  192. for {
  193. select {
  194. case <-chTimer.C:
  195. s.processDir()
  196. s.printLogs()
  197. }
  198. }
  199. }
  200. func Process() {
  201. if len(os.Args) != 2 {
  202. fmt.Println("Missing configuration file")
  203. return
  204. }
  205. conf := kconf.GetConf()
  206. if !conf.Init() {
  207. return
  208. }
  209. se := new(server)
  210. if !se.init() {
  211. se.printLogs()
  212. return
  213. }
  214. fmt.Println("Kuiper kubernetes tool is started successfully!")
  215. se.watchFolders()
  216. }