rule_manager.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "bytes"
  17. "encoding/json"
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/topo/rule"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/errorx"
  23. "github.com/lf-edge/ekuiper/pkg/infra"
  24. "sort"
  25. "sync"
  26. "time"
  27. )
  28. // Rule storage includes kv and in memory registry
  29. // Kv stores the rule text with *expected* status so that the rule can be restored after restart
  30. // Registry stores the current rule state in runtime
  31. // Here registry is the in memory registry
  32. var registry *RuleRegistry
  33. type RuleRegistry struct {
  34. sync.RWMutex
  35. internal map[string]*rule.RuleState
  36. }
  37. // Store create the in memory entry for a rule. Run in:
  38. // 1. Restore the rules from KV at startup
  39. // 2. Restore the rules when importing
  40. // 3. Create a rule
  41. func (rr *RuleRegistry) Store(key string, value *rule.RuleState) {
  42. rr.Lock()
  43. rr.internal[key] = value
  44. rr.Unlock()
  45. }
  46. // Load the entry of a rule by id. It is used to get the current rule state
  47. // or send command to a running rule
  48. func (rr *RuleRegistry) Load(key string) (value *rule.RuleState, ok bool) {
  49. rr.RLock()
  50. result, ok := rr.internal[key]
  51. rr.RUnlock()
  52. return result, ok
  53. }
  54. // Delete Atomic get and delete. Only run when deleting a rule in runtime.
  55. func (rr *RuleRegistry) Delete(key string) (*rule.RuleState, bool) {
  56. rr.Lock()
  57. result, ok := rr.internal[key]
  58. if ok {
  59. delete(rr.internal, key)
  60. }
  61. rr.Unlock()
  62. return result, ok
  63. }
  64. func createRule(name, ruleJson string) (string, error) {
  65. // Validate the rule json
  66. r, err := ruleProcessor.GetRuleByJson(name, ruleJson)
  67. if err != nil {
  68. return "", fmt.Errorf("Invalid rule json: %v", err)
  69. }
  70. // Validate the topo
  71. rs, err := createRuleState(r)
  72. if err != nil {
  73. return r.Id, fmt.Errorf("Create rule topo error: %v", err)
  74. }
  75. // Store to KV
  76. err = ruleProcessor.ExecCreate(r.Id, ruleJson)
  77. if err != nil {
  78. // Do not store to KV so also delete the in memory shadow
  79. deleteRule(r.Id)
  80. return r.Id, fmt.Errorf("Store the rule error: %v", err)
  81. }
  82. // Start the rule asyncly
  83. if r.Triggered {
  84. go func() {
  85. panicOrError := infra.SafeRun(func() error {
  86. //Start the rule which runs async
  87. return rs.Start()
  88. })
  89. if panicOrError != nil {
  90. logger.Errorf("Rule %s start failed: %s", r.Id, panicOrError)
  91. }
  92. }()
  93. }
  94. return r.Id, nil
  95. }
  96. // Create and initialize a rule state.
  97. // Errors are possible during plan the topo.
  98. // If error happens return immediately without add it to the registry
  99. func createRuleState(r *api.Rule) (*rule.RuleState, error) {
  100. rs, err := rule.NewRuleState(r)
  101. if err != nil {
  102. return rs, err
  103. }
  104. registry.Store(r.Id, rs)
  105. return rs, nil
  106. }
  107. func recoverRule(r *api.Rule) string {
  108. // Validate the topo
  109. rs, err := createRuleState(r)
  110. if err != nil { // when recovering rules, assume the rules are valid, so always add it to the registry
  111. conf.Log.Errorf("Create rule topo error: %v", err)
  112. r.Triggered = false
  113. registry.Store(r.Id, rs)
  114. }
  115. if !r.Triggered {
  116. return fmt.Sprintf("Rule %s was stopped.", r.Id)
  117. } else {
  118. panicOrError := infra.SafeRun(func() error {
  119. //Start the rule which runs async
  120. return rs.Start()
  121. })
  122. if panicOrError != nil {
  123. return fmt.Sprintf("Rule %s start failed: %s", r.Id, panicOrError)
  124. }
  125. }
  126. return fmt.Sprintf("Rule %s was started.", r.Id)
  127. }
  128. func updateRule(ruleId, ruleJson string) error {
  129. // Validate the rule json
  130. r, err := ruleProcessor.GetRuleByJson(ruleId, ruleJson)
  131. if err != nil {
  132. return fmt.Errorf("Invalid rule json: %v", err)
  133. }
  134. if rs, ok := registry.Load(r.Id); ok {
  135. rs.UpdateTopo(r)
  136. return nil
  137. } else {
  138. return fmt.Errorf("Rule %s registry not found, try to delete it and recreate", r.Id)
  139. }
  140. }
  141. func deleteRule(name string) (result string) {
  142. if rs, ok := registry.Delete(name); ok {
  143. rs.Close()
  144. result = fmt.Sprintf("Rule %s was deleted.", name)
  145. } else {
  146. result = fmt.Sprintf("Rule %s was not found.", name)
  147. }
  148. return
  149. }
  150. func startRule(name string) error {
  151. rs, ok := registry.Load(name)
  152. if !ok {
  153. return fmt.Errorf("Rule %s is not found in registry, please check if it is created", name)
  154. } else {
  155. err := rs.Start()
  156. if err != nil {
  157. return err
  158. }
  159. err = ruleProcessor.ExecReplaceRuleState(rs.RuleId, true)
  160. return err
  161. }
  162. }
  163. func stopRule(name string) (result string) {
  164. if rs, ok := registry.Load(name); ok {
  165. err := rs.Stop()
  166. if err != nil {
  167. conf.Log.Warn(err)
  168. }
  169. err = ruleProcessor.ExecReplaceRuleState(name, false)
  170. if err != nil {
  171. conf.Log.Warnf("stop rule found error: %s", err.Error())
  172. }
  173. result = fmt.Sprintf("Rule %s was stopped.", name)
  174. } else {
  175. result = fmt.Sprintf("Rule %s was not found.", name)
  176. }
  177. return
  178. }
  179. func restartRule(name string) error {
  180. stopRule(name)
  181. time.Sleep(1 * time.Millisecond)
  182. return startRule(name)
  183. }
  184. func getRuleStatus(name string) (string, error) {
  185. if rs, ok := registry.Load(name); ok {
  186. result, err := rs.GetState()
  187. if err != nil {
  188. return "", err
  189. }
  190. if result == "Running" {
  191. keys, values := (*rs.Topology).GetMetrics()
  192. metrics := "{"
  193. metrics += `"status": "running",`
  194. for i, key := range keys {
  195. value := values[i]
  196. switch value.(type) {
  197. case string:
  198. metrics += fmt.Sprintf("\"%s\":%q,", key, value)
  199. default:
  200. metrics += fmt.Sprintf("\"%s\":%v,", key, value)
  201. }
  202. }
  203. metrics = metrics[:len(metrics)-1] + "}"
  204. dst := &bytes.Buffer{}
  205. if err = json.Indent(dst, []byte(metrics), "", " "); err != nil {
  206. result = metrics
  207. } else {
  208. result = dst.String()
  209. }
  210. } else {
  211. result = fmt.Sprintf(`{"status": "stopped", "message": "%s"}`, result)
  212. }
  213. return result, nil
  214. } else {
  215. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("Rule %s is not found", name))
  216. }
  217. }
  218. func getAllRulesWithStatus() ([]map[string]interface{}, error) {
  219. ruleIds, err := ruleProcessor.GetAllRules()
  220. if err != nil {
  221. return nil, err
  222. }
  223. sort.Strings(ruleIds)
  224. result := make([]map[string]interface{}, len(ruleIds))
  225. for i, id := range ruleIds {
  226. ruleName := id
  227. rule, _ := ruleProcessor.GetRuleById(id)
  228. if rule != nil && rule.Name != "" {
  229. ruleName = rule.Name
  230. }
  231. s, err := getRuleState(id)
  232. if err != nil {
  233. s = fmt.Sprintf("error: %s", err)
  234. }
  235. result[i] = map[string]interface{}{
  236. "id": id,
  237. "name": ruleName,
  238. "status": s,
  239. }
  240. }
  241. return result, nil
  242. }
  243. func getRuleState(name string) (string, error) {
  244. if rs, ok := registry.Load(name); ok {
  245. return rs.GetState()
  246. } else {
  247. return "", fmt.Errorf("Rule %s is not found in registry", name)
  248. }
  249. }
  250. func getRuleTopo(name string) (string, error) {
  251. if rs, ok := registry.Load(name); ok {
  252. graph := rs.GetTopoGraph()
  253. if graph == nil {
  254. return "", errorx.New(fmt.Sprintf("Fail to get rule %s's topo, make sure the rule has been started before", name))
  255. }
  256. bs, err := json.Marshal(graph)
  257. if err != nil {
  258. return "", errorx.New(fmt.Sprintf("Fail to encode rule %s's topo", name))
  259. } else {
  260. return string(bs), nil
  261. }
  262. } else {
  263. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("Rule %s is not found", name))
  264. }
  265. }