rule_manager.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "bytes"
  17. "encoding/json"
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/topo/rule"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/errorx"
  23. "github.com/lf-edge/ekuiper/pkg/infra"
  24. "sort"
  25. "sync"
  26. "time"
  27. )
  28. // Rule storage includes kv and in memory registry
  29. // Kv stores the rule text with *expected* status so that the rule can be restored after restart
  30. // Registry stores the current rule state in runtime
  31. // Here registry is the in memory registry
  32. var registry *RuleRegistry
  33. type RuleRegistry struct {
  34. sync.RWMutex
  35. internal map[string]*rule.RuleState
  36. }
  37. // Store create the in memory entry for a rule. Run in:
  38. // 1. Restore the rules from KV at startup
  39. // 2. Restore the rules when importing
  40. // 3. Create a rule
  41. func (rr *RuleRegistry) Store(key string, value *rule.RuleState) {
  42. rr.Lock()
  43. rr.internal[key] = value
  44. rr.Unlock()
  45. }
  46. // Load the entry of a rule by id. It is used to get the current rule state
  47. // or send command to a running rule
  48. func (rr *RuleRegistry) Load(key string) (value *rule.RuleState, ok bool) {
  49. rr.RLock()
  50. result, ok := rr.internal[key]
  51. rr.RUnlock()
  52. return result, ok
  53. }
  54. // Delete Atomic get and delete. Only run when deleting a rule in runtime.
  55. func (rr *RuleRegistry) Delete(key string) (*rule.RuleState, bool) {
  56. rr.Lock()
  57. result, ok := rr.internal[key]
  58. if ok {
  59. delete(rr.internal, key)
  60. }
  61. rr.Unlock()
  62. return result, ok
  63. }
  64. func createRule(name, ruleJson string) (string, error) {
  65. // Validate the rule json
  66. r, err := ruleProcessor.GetRuleByJson(name, ruleJson)
  67. if err != nil {
  68. return "", fmt.Errorf("invalid rule json: %v", err)
  69. }
  70. // Store to KV
  71. err = ruleProcessor.ExecCreate(r.Id, ruleJson)
  72. if err != nil {
  73. return r.Id, fmt.Errorf("store the rule error: %v", err)
  74. }
  75. // Validate the topo
  76. rs, err := createRuleState(r)
  77. if err != nil {
  78. // Do not store to registry so also delete the KV
  79. deleteRule(r.Id)
  80. _, _ = ruleProcessor.ExecDrop(r.Id)
  81. return r.Id, fmt.Errorf("create rule topo error: %v", err)
  82. }
  83. // Start the rule asyncly
  84. if r.Triggered {
  85. go func() {
  86. panicOrError := infra.SafeRun(func() error {
  87. //Start the rule which runs async
  88. return rs.Start()
  89. })
  90. if panicOrError != nil {
  91. logger.Errorf("Rule %s start failed: %s", r.Id, panicOrError)
  92. }
  93. }()
  94. }
  95. return r.Id, nil
  96. }
  97. // Create and initialize a rule state.
  98. // Errors are possible during plan the topo.
  99. // If error happens return immediately without add it to the registry
  100. func createRuleState(r *api.Rule) (*rule.RuleState, error) {
  101. rs, err := rule.NewRuleState(r)
  102. if err != nil {
  103. return rs, err
  104. }
  105. registry.Store(r.Id, rs)
  106. return rs, nil
  107. }
  108. func recoverRule(r *api.Rule) string {
  109. // Validate the topo
  110. rs, err := createRuleState(r)
  111. if err != nil { // when recovering rules, assume the rules are valid, so always add it to the registry
  112. conf.Log.Errorf("Create rule topo error: %v", err)
  113. r.Triggered = false
  114. registry.Store(r.Id, rs)
  115. }
  116. if !r.Triggered {
  117. return fmt.Sprintf("Rule %s was stopped.", r.Id)
  118. } else {
  119. panicOrError := infra.SafeRun(func() error {
  120. //Start the rule which runs async
  121. return rs.Start()
  122. })
  123. if panicOrError != nil {
  124. return fmt.Sprintf("Rule %s start failed: %s", r.Id, panicOrError)
  125. }
  126. }
  127. return fmt.Sprintf("Rule %s was started.", r.Id)
  128. }
  129. func updateRule(ruleId, ruleJson string) error {
  130. // Validate the rule json
  131. r, err := ruleProcessor.GetRuleByJson(ruleId, ruleJson)
  132. if err != nil {
  133. return fmt.Errorf("Invalid rule json: %v", err)
  134. }
  135. if rs, ok := registry.Load(r.Id); ok {
  136. rs.UpdateTopo(r)
  137. err = ruleProcessor.ExecReplaceRuleState(rs.RuleId, true)
  138. return err
  139. } else {
  140. return fmt.Errorf("Rule %s registry not found, try to delete it and recreate", r.Id)
  141. }
  142. }
  143. func deleteRule(name string) (result string) {
  144. if rs, ok := registry.Delete(name); ok {
  145. rs.Close()
  146. result = fmt.Sprintf("Rule %s was deleted.", name)
  147. } else {
  148. result = fmt.Sprintf("Rule %s was not found.", name)
  149. }
  150. return
  151. }
  152. func startRule(name string) error {
  153. rs, ok := registry.Load(name)
  154. if !ok {
  155. return fmt.Errorf("Rule %s is not found in registry, please check if it is created", name)
  156. } else {
  157. err := rs.Start()
  158. if err != nil {
  159. return err
  160. }
  161. err = ruleProcessor.ExecReplaceRuleState(rs.RuleId, true)
  162. return err
  163. }
  164. }
  165. func stopRule(name string) (result string) {
  166. if rs, ok := registry.Load(name); ok {
  167. err := rs.Stop()
  168. if err != nil {
  169. conf.Log.Warn(err)
  170. }
  171. err = ruleProcessor.ExecReplaceRuleState(name, false)
  172. if err != nil {
  173. conf.Log.Warnf("stop rule found error: %s", err.Error())
  174. }
  175. result = fmt.Sprintf("Rule %s was stopped.", name)
  176. } else {
  177. result = fmt.Sprintf("Rule %s was not found.", name)
  178. }
  179. return
  180. }
  181. func restartRule(name string) error {
  182. stopRule(name)
  183. time.Sleep(1 * time.Millisecond)
  184. return startRule(name)
  185. }
  186. func getRuleStatus(name string) (string, error) {
  187. if rs, ok := registry.Load(name); ok {
  188. result, err := rs.GetState()
  189. if err != nil {
  190. return "", err
  191. }
  192. if result == "Running" {
  193. keys, values := (*rs.Topology).GetMetrics()
  194. metrics := "{"
  195. metrics += `"status": "running",`
  196. for i, key := range keys {
  197. value := values[i]
  198. switch value.(type) {
  199. case string:
  200. metrics += fmt.Sprintf("\"%s\":%q,", key, value)
  201. default:
  202. metrics += fmt.Sprintf("\"%s\":%v,", key, value)
  203. }
  204. }
  205. metrics = metrics[:len(metrics)-1] + "}"
  206. dst := &bytes.Buffer{}
  207. if err = json.Indent(dst, []byte(metrics), "", " "); err != nil {
  208. result = metrics
  209. } else {
  210. result = dst.String()
  211. }
  212. } else {
  213. result = fmt.Sprintf(`{"status": "stopped", "message": "%s"}`, result)
  214. }
  215. return result, nil
  216. } else {
  217. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("Rule %s is not found", name))
  218. }
  219. }
  220. func getAllRulesWithStatus() ([]map[string]interface{}, error) {
  221. ruleIds, err := ruleProcessor.GetAllRules()
  222. if err != nil {
  223. return nil, err
  224. }
  225. sort.Strings(ruleIds)
  226. result := make([]map[string]interface{}, len(ruleIds))
  227. for i, id := range ruleIds {
  228. ruleName := id
  229. rule, _ := ruleProcessor.GetRuleById(id)
  230. if rule != nil && rule.Name != "" {
  231. ruleName = rule.Name
  232. }
  233. s, err := getRuleState(id)
  234. if err != nil {
  235. s = fmt.Sprintf("error: %s", err)
  236. }
  237. result[i] = map[string]interface{}{
  238. "id": id,
  239. "name": ruleName,
  240. "status": s,
  241. }
  242. }
  243. return result, nil
  244. }
  245. func getRuleState(name string) (string, error) {
  246. if rs, ok := registry.Load(name); ok {
  247. return rs.GetState()
  248. } else {
  249. return "", fmt.Errorf("Rule %s is not found in registry", name)
  250. }
  251. }
  252. func getRuleTopo(name string) (string, error) {
  253. if rs, ok := registry.Load(name); ok {
  254. graph := rs.GetTopoGraph()
  255. if graph == nil {
  256. return "", errorx.New(fmt.Sprintf("Fail to get rule %s's topo, make sure the rule has been started before", name))
  257. }
  258. bs, err := json.Marshal(graph)
  259. if err != nil {
  260. return "", errorx.New(fmt.Sprintf("Fail to encode rule %s's topo", name))
  261. } else {
  262. return string(bs), nil
  263. }
  264. } else {
  265. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("Rule %s is not found", name))
  266. }
  267. }