rule_manager.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "bytes"
  17. "encoding/json"
  18. "fmt"
  19. "sort"
  20. "sync"
  21. "time"
  22. "github.com/lf-edge/ekuiper/internal/conf"
  23. "github.com/lf-edge/ekuiper/internal/topo/rule"
  24. "github.com/lf-edge/ekuiper/pkg/api"
  25. "github.com/lf-edge/ekuiper/pkg/errorx"
  26. "github.com/lf-edge/ekuiper/pkg/infra"
  27. )
  28. // Rule storage includes kv and in memory registry
  29. // Kv stores the rule text with *expected* status so that the rule can be restored after restart
  30. // Registry stores the current rule state in runtime
  31. // Here registry is the in memory registry
  32. var registry *RuleRegistry
  33. type RuleRegistry struct {
  34. sync.RWMutex
  35. internal map[string]*rule.RuleState
  36. }
  37. // Store create the in memory entry for a rule. Run in:
  38. // 1. Restore the rules from KV at startup
  39. // 2. Restore the rules when importing
  40. // 3. Create a rule
  41. func (rr *RuleRegistry) Store(key string, value *rule.RuleState) {
  42. rr.Lock()
  43. rr.internal[key] = value
  44. rr.Unlock()
  45. }
  46. // Load the entry of a rule by id. It is used to get the current rule state
  47. // or send command to a running rule
  48. func (rr *RuleRegistry) Load(key string) (value *rule.RuleState, ok bool) {
  49. rr.RLock()
  50. result, ok := rr.internal[key]
  51. rr.RUnlock()
  52. return result, ok
  53. }
  54. // Delete Atomic get and delete. Only run when deleting a rule in runtime.
  55. func (rr *RuleRegistry) Delete(key string) (*rule.RuleState, bool) {
  56. rr.Lock()
  57. result, ok := rr.internal[key]
  58. if ok {
  59. delete(rr.internal, key)
  60. }
  61. rr.Unlock()
  62. return result, ok
  63. }
  64. func createRule(name, ruleJson string) (string, error) {
  65. var rs *rule.RuleState = nil
  66. var err error = nil
  67. // Validate the rule json
  68. r, err := ruleProcessor.GetRuleByJson(name, ruleJson)
  69. if err != nil {
  70. return "", fmt.Errorf("invalid rule json: %v", err)
  71. }
  72. // Store to KV
  73. err = ruleProcessor.ExecCreate(r.Id, ruleJson)
  74. if err != nil {
  75. return r.Id, fmt.Errorf("store the rule error: %v", err)
  76. }
  77. // Validate the topo
  78. panicOrError := infra.SafeRun(func() error {
  79. rs, err = createRuleState(r)
  80. return err
  81. })
  82. if panicOrError != nil {
  83. // Do not store to registry so also delete the KV
  84. deleteRule(r.Id)
  85. _, _ = ruleProcessor.ExecDrop(r.Id)
  86. return r.Id, fmt.Errorf("create rule topo error: %v", panicOrError)
  87. }
  88. // Start the rule asyncly
  89. if r.Triggered {
  90. go func() {
  91. panicOrError := infra.SafeRun(func() error {
  92. // Start the rule which runs async
  93. return rs.Start()
  94. })
  95. if panicOrError != nil {
  96. logger.Errorf("Rule %s start failed: %s", r.Id, panicOrError)
  97. }
  98. }()
  99. }
  100. return r.Id, nil
  101. }
  102. // Create and initialize a rule state.
  103. // Errors are possible during plan the topo.
  104. // If error happens return immediately without add it to the registry
  105. func createRuleState(r *api.Rule) (*rule.RuleState, error) {
  106. rs, err := rule.NewRuleState(r)
  107. if err != nil {
  108. return rs, err
  109. }
  110. registry.Store(r.Id, rs)
  111. return rs, nil
  112. }
  113. func recoverRule(r *api.Rule) string {
  114. var rs *rule.RuleState = nil
  115. var err error = nil
  116. // Validate the topo
  117. panicOrError := infra.SafeRun(func() error {
  118. rs, err = createRuleState(r)
  119. return err
  120. })
  121. if panicOrError != nil { // when recovering rules, assume the rules are valid, so always add it to the registry
  122. conf.Log.Errorf("Create rule topo error: %v", err)
  123. r.Triggered = false
  124. registry.Store(r.Id, rs)
  125. }
  126. if !r.Triggered {
  127. return fmt.Sprintf("Rule %s was stopped.", r.Id)
  128. } else {
  129. panicOrError := infra.SafeRun(func() error {
  130. // Start the rule which runs async
  131. return rs.Start()
  132. })
  133. if panicOrError != nil {
  134. return fmt.Sprintf("Rule %s start failed: %s", r.Id, panicOrError)
  135. }
  136. }
  137. return fmt.Sprintf("Rule %s was started.", r.Id)
  138. }
  139. func updateRule(ruleId, ruleJson string) error {
  140. // Validate the rule json
  141. r, err := ruleProcessor.GetRuleByJson(ruleId, ruleJson)
  142. if err != nil {
  143. return fmt.Errorf("Invalid rule json: %v", err)
  144. }
  145. if rs, ok := registry.Load(r.Id); ok {
  146. err := rs.UpdateTopo(r)
  147. if err != nil {
  148. return err
  149. }
  150. err = ruleProcessor.ExecReplaceRuleState(rs.RuleId, true)
  151. return err
  152. } else {
  153. return fmt.Errorf("Rule %s registry not found, try to delete it and recreate", r.Id)
  154. }
  155. }
  156. func deleteRule(name string) (result string) {
  157. if rs, ok := registry.Delete(name); ok {
  158. rs.Close()
  159. result = fmt.Sprintf("Rule %s was deleted.", name)
  160. } else {
  161. result = fmt.Sprintf("Rule %s was not found.", name)
  162. }
  163. return
  164. }
  165. func startRule(name string) error {
  166. return reRunRule(name)
  167. }
  168. // reRunRule rerun the rule from optimize to Open the operator in order to refresh the schema
  169. func reRunRule(name string) error {
  170. rs, ok := registry.Load(name)
  171. if !ok {
  172. return fmt.Errorf("Rule %s is not found in registry, please check if it is created", name)
  173. } else {
  174. return rs.UpdateTopo(rs.Rule)
  175. }
  176. }
  177. func stopRule(name string) (result string) {
  178. if rs, ok := registry.Load(name); ok {
  179. err := rs.Stop()
  180. if err != nil {
  181. conf.Log.Warn(err)
  182. }
  183. err = ruleProcessor.ExecReplaceRuleState(name, false)
  184. if err != nil {
  185. conf.Log.Warnf("stop rule found error: %s", err.Error())
  186. }
  187. result = fmt.Sprintf("Rule %s was stopped.", name)
  188. } else {
  189. result = fmt.Sprintf("Rule %s was not found.", name)
  190. }
  191. return
  192. }
  193. func restartRule(name string) error {
  194. stopRule(name)
  195. time.Sleep(1 * time.Millisecond)
  196. return startRule(name)
  197. }
  198. func getRuleStatus(name string) (string, error) {
  199. if rs, ok := registry.Load(name); ok {
  200. result, err := rs.GetState()
  201. if err != nil {
  202. return "", err
  203. }
  204. if result == "Running" {
  205. keys, values := (*rs.Topology).GetMetrics()
  206. metrics := "{"
  207. metrics += `"status": "running",`
  208. for i, key := range keys {
  209. value := values[i]
  210. switch value.(type) {
  211. case string:
  212. metrics += fmt.Sprintf("\"%s\":%q,", key, value)
  213. default:
  214. metrics += fmt.Sprintf("\"%s\":%v,", key, value)
  215. }
  216. }
  217. metrics = metrics[:len(metrics)-1] + "}"
  218. dst := &bytes.Buffer{}
  219. if err = json.Indent(dst, []byte(metrics), "", " "); err != nil {
  220. result = metrics
  221. } else {
  222. result = dst.String()
  223. }
  224. } else {
  225. result = fmt.Sprintf(`{"status": "stopped", "message": "%s"}`, result)
  226. }
  227. return result, nil
  228. } else {
  229. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("Rule %s is not found", name))
  230. }
  231. }
  232. func getAllRulesWithStatus() ([]map[string]interface{}, error) {
  233. ruleIds, err := ruleProcessor.GetAllRules()
  234. if err != nil {
  235. return nil, err
  236. }
  237. sort.Strings(ruleIds)
  238. result := make([]map[string]interface{}, len(ruleIds))
  239. for i, id := range ruleIds {
  240. ruleName := id
  241. rule, _ := ruleProcessor.GetRuleById(id)
  242. if rule != nil && rule.Name != "" {
  243. ruleName = rule.Name
  244. }
  245. s, err := getRuleState(id)
  246. if err != nil {
  247. s = fmt.Sprintf("error: %s", err)
  248. }
  249. result[i] = map[string]interface{}{
  250. "id": id,
  251. "name": ruleName,
  252. "status": s,
  253. }
  254. }
  255. return result, nil
  256. }
  257. func getRuleState(name string) (string, error) {
  258. if rs, ok := registry.Load(name); ok {
  259. return rs.GetState()
  260. } else {
  261. return "", fmt.Errorf("Rule %s is not found in registry", name)
  262. }
  263. }
  264. func getRuleTopo(name string) (string, error) {
  265. if rs, ok := registry.Load(name); ok {
  266. graph := rs.GetTopoGraph()
  267. if graph == nil {
  268. return "", errorx.New(fmt.Sprintf("Fail to get rule %s's topo, make sure the rule has been started before", name))
  269. }
  270. bs, err := json.Marshal(graph)
  271. if err != nil {
  272. return "", errorx.New(fmt.Sprintf("Fail to encode rule %s's topo", name))
  273. } else {
  274. return string(bs), nil
  275. }
  276. } else {
  277. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("Rule %s is not found", name))
  278. }
  279. }