rule_manager.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "bytes"
  17. "encoding/json"
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/topo/rule"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/errorx"
  23. "github.com/lf-edge/ekuiper/pkg/infra"
  24. "sort"
  25. "sync"
  26. "time"
  27. )
  28. // Rule storage includes kv and in memory registry
  29. // Kv stores the rule text with *expected* status so that the rule can be restored after restart
  30. // Registry stores the current rule state in runtime
  31. // Here registry is the in memory registry
  32. var registry *RuleRegistry
  33. type RuleRegistry struct {
  34. sync.RWMutex
  35. internal map[string]*rule.RuleState
  36. }
  37. // Store create the in memory entry for a rule. Run in:
  38. // 1. Restore the rules from KV at startup
  39. // 2. Restore the rules when importing
  40. // 3. Create a rule
  41. func (rr *RuleRegistry) Store(key string, value *rule.RuleState) {
  42. rr.Lock()
  43. rr.internal[key] = value
  44. rr.Unlock()
  45. }
  46. // Load the entry of a rule by id. It is used to get the current rule state
  47. // or send command to a running rule
  48. func (rr *RuleRegistry) Load(key string) (value *rule.RuleState, ok bool) {
  49. rr.RLock()
  50. result, ok := rr.internal[key]
  51. rr.RUnlock()
  52. return result, ok
  53. }
  54. // Delete Atomic get and delete. Only run when deleting a rule in runtime.
  55. func (rr *RuleRegistry) Delete(key string) (*rule.RuleState, bool) {
  56. rr.Lock()
  57. result, ok := rr.internal[key]
  58. if ok {
  59. delete(rr.internal, key)
  60. }
  61. rr.Unlock()
  62. return result, ok
  63. }
  64. func createRule(name, ruleJson string) (string, error) {
  65. // Validate the rule json
  66. r, err := ruleProcessor.GetRuleByJson(name, ruleJson)
  67. if err != nil {
  68. return "", fmt.Errorf("invalid rule json: %v", err)
  69. }
  70. // Store to KV
  71. err = ruleProcessor.ExecCreate(r.Id, ruleJson)
  72. if err != nil {
  73. return r.Id, fmt.Errorf("store the rule error: %v", err)
  74. }
  75. // Validate the topo
  76. rs, err := createRuleState(r)
  77. if err != nil {
  78. // Do not store to registry so also delete the KV
  79. deleteRule(r.Id)
  80. _, _ = ruleProcessor.ExecDrop(r.Id)
  81. return r.Id, fmt.Errorf("create rule topo error: %v", err)
  82. }
  83. // Start the rule asyncly
  84. if r.Triggered {
  85. go func() {
  86. panicOrError := infra.SafeRun(func() error {
  87. //Start the rule which runs async
  88. return rs.Start()
  89. })
  90. if panicOrError != nil {
  91. logger.Errorf("Rule %s start failed: %s", r.Id, panicOrError)
  92. }
  93. }()
  94. }
  95. return r.Id, nil
  96. }
  97. // Create and initialize a rule state.
  98. // Errors are possible during plan the topo.
  99. // If error happens return immediately without add it to the registry
  100. func createRuleState(r *api.Rule) (*rule.RuleState, error) {
  101. rs, err := rule.NewRuleState(r)
  102. if err != nil {
  103. return rs, err
  104. }
  105. registry.Store(r.Id, rs)
  106. return rs, nil
  107. }
  108. func recoverRule(r *api.Rule) string {
  109. // Validate the topo
  110. rs, err := createRuleState(r)
  111. if err != nil { // when recovering rules, assume the rules are valid, so always add it to the registry
  112. conf.Log.Errorf("Create rule topo error: %v", err)
  113. r.Triggered = false
  114. registry.Store(r.Id, rs)
  115. }
  116. if !r.Triggered {
  117. return fmt.Sprintf("Rule %s was stopped.", r.Id)
  118. } else {
  119. panicOrError := infra.SafeRun(func() error {
  120. //Start the rule which runs async
  121. return rs.Start()
  122. })
  123. if panicOrError != nil {
  124. return fmt.Sprintf("Rule %s start failed: %s", r.Id, panicOrError)
  125. }
  126. }
  127. return fmt.Sprintf("Rule %s was started.", r.Id)
  128. }
  129. func updateRule(ruleId, ruleJson string) error {
  130. // Validate the rule json
  131. r, err := ruleProcessor.GetRuleByJson(ruleId, ruleJson)
  132. if err != nil {
  133. return fmt.Errorf("Invalid rule json: %v", err)
  134. }
  135. if rs, ok := registry.Load(r.Id); ok {
  136. err := rs.UpdateTopo(r)
  137. if err != nil {
  138. return err
  139. }
  140. err = ruleProcessor.ExecReplaceRuleState(rs.RuleId, true)
  141. return err
  142. } else {
  143. return fmt.Errorf("Rule %s registry not found, try to delete it and recreate", r.Id)
  144. }
  145. }
  146. func deleteRule(name string) (result string) {
  147. if rs, ok := registry.Delete(name); ok {
  148. rs.Close()
  149. result = fmt.Sprintf("Rule %s was deleted.", name)
  150. } else {
  151. result = fmt.Sprintf("Rule %s was not found.", name)
  152. }
  153. return
  154. }
  155. func startRule(name string) error {
  156. rs, ok := registry.Load(name)
  157. if !ok {
  158. return fmt.Errorf("Rule %s is not found in registry, please check if it is created", name)
  159. } else {
  160. err := rs.Start()
  161. if err != nil {
  162. return err
  163. }
  164. err = ruleProcessor.ExecReplaceRuleState(rs.RuleId, true)
  165. return err
  166. }
  167. }
  168. func stopRule(name string) (result string) {
  169. if rs, ok := registry.Load(name); ok {
  170. err := rs.Stop()
  171. if err != nil {
  172. conf.Log.Warn(err)
  173. }
  174. err = ruleProcessor.ExecReplaceRuleState(name, false)
  175. if err != nil {
  176. conf.Log.Warnf("stop rule found error: %s", err.Error())
  177. }
  178. result = fmt.Sprintf("Rule %s was stopped.", name)
  179. } else {
  180. result = fmt.Sprintf("Rule %s was not found.", name)
  181. }
  182. return
  183. }
  184. func restartRule(name string) error {
  185. stopRule(name)
  186. time.Sleep(1 * time.Millisecond)
  187. return startRule(name)
  188. }
  189. func getRuleStatus(name string) (string, error) {
  190. if rs, ok := registry.Load(name); ok {
  191. result, err := rs.GetState()
  192. if err != nil {
  193. return "", err
  194. }
  195. if result == "Running" {
  196. keys, values := (*rs.Topology).GetMetrics()
  197. metrics := "{"
  198. metrics += `"status": "running",`
  199. for i, key := range keys {
  200. value := values[i]
  201. switch value.(type) {
  202. case string:
  203. metrics += fmt.Sprintf("\"%s\":%q,", key, value)
  204. default:
  205. metrics += fmt.Sprintf("\"%s\":%v,", key, value)
  206. }
  207. }
  208. metrics = metrics[:len(metrics)-1] + "}"
  209. dst := &bytes.Buffer{}
  210. if err = json.Indent(dst, []byte(metrics), "", " "); err != nil {
  211. result = metrics
  212. } else {
  213. result = dst.String()
  214. }
  215. } else {
  216. result = fmt.Sprintf(`{"status": "stopped", "message": "%s"}`, result)
  217. }
  218. return result, nil
  219. } else {
  220. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("Rule %s is not found", name))
  221. }
  222. }
  223. func getAllRulesWithStatus() ([]map[string]interface{}, error) {
  224. ruleIds, err := ruleProcessor.GetAllRules()
  225. if err != nil {
  226. return nil, err
  227. }
  228. sort.Strings(ruleIds)
  229. result := make([]map[string]interface{}, len(ruleIds))
  230. for i, id := range ruleIds {
  231. ruleName := id
  232. rule, _ := ruleProcessor.GetRuleById(id)
  233. if rule != nil && rule.Name != "" {
  234. ruleName = rule.Name
  235. }
  236. s, err := getRuleState(id)
  237. if err != nil {
  238. s = fmt.Sprintf("error: %s", err)
  239. }
  240. result[i] = map[string]interface{}{
  241. "id": id,
  242. "name": ruleName,
  243. "status": s,
  244. }
  245. }
  246. return result, nil
  247. }
  248. func getRuleState(name string) (string, error) {
  249. if rs, ok := registry.Load(name); ok {
  250. return rs.GetState()
  251. } else {
  252. return "", fmt.Errorf("Rule %s is not found in registry", name)
  253. }
  254. }
  255. func getRuleTopo(name string) (string, error) {
  256. if rs, ok := registry.Load(name); ok {
  257. graph := rs.GetTopoGraph()
  258. if graph == nil {
  259. return "", errorx.New(fmt.Sprintf("Fail to get rule %s's topo, make sure the rule has been started before", name))
  260. }
  261. bs, err := json.Marshal(graph)
  262. if err != nil {
  263. return "", errorx.New(fmt.Sprintf("Fail to encode rule %s's topo", name))
  264. } else {
  265. return string(bs), nil
  266. }
  267. } else {
  268. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("Rule %s is not found", name))
  269. }
  270. }