rule_manager.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "bytes"
  17. "encoding/json"
  18. "fmt"
  19. "sort"
  20. "sync"
  21. "time"
  22. "github.com/lf-edge/ekuiper/internal/conf"
  23. "github.com/lf-edge/ekuiper/internal/topo/rule"
  24. "github.com/lf-edge/ekuiper/pkg/api"
  25. "github.com/lf-edge/ekuiper/pkg/cast"
  26. "github.com/lf-edge/ekuiper/pkg/errorx"
  27. "github.com/lf-edge/ekuiper/pkg/infra"
  28. )
  29. // Rule storage includes kv and in memory registry
  30. // Kv stores the rule text with *expected* status so that the rule can be restored after restart
  31. // Registry stores the current rule state in runtime
  32. // Here registry is the in memory registry
  33. var registry *RuleRegistry
  34. type RuleRegistry struct {
  35. sync.RWMutex
  36. internal map[string]*rule.RuleState
  37. }
  38. // Store create the in memory entry for a rule. Run in:
  39. // 1. Restore the rules from KV at startup
  40. // 2. Restore the rules when importing
  41. // 3. Create a rule
  42. func (rr *RuleRegistry) Store(key string, value *rule.RuleState) {
  43. rr.Lock()
  44. rr.internal[key] = value
  45. rr.Unlock()
  46. }
  47. // Load the entry of a rule by id. It is used to get the current rule state
  48. // or send command to a running rule
  49. func (rr *RuleRegistry) Load(key string) (value *rule.RuleState, ok bool) {
  50. rr.RLock()
  51. result, ok := rr.internal[key]
  52. rr.RUnlock()
  53. return result, ok
  54. }
  55. // Delete Atomic get and delete. Only run when deleting a rule in runtime.
  56. func (rr *RuleRegistry) Delete(key string) (*rule.RuleState, bool) {
  57. rr.Lock()
  58. result, ok := rr.internal[key]
  59. if ok {
  60. delete(rr.internal, key)
  61. }
  62. rr.Unlock()
  63. return result, ok
  64. }
  65. func createRule(name, ruleJson string) (string, error) {
  66. var rs *rule.RuleState = nil
  67. var err error = nil
  68. // Validate the rule json
  69. r, err := ruleProcessor.GetRuleByJson(name, ruleJson)
  70. if err != nil {
  71. return "", fmt.Errorf("invalid rule json: %v", err)
  72. }
  73. // Store to KV
  74. err = ruleProcessor.ExecCreate(r.Id, ruleJson)
  75. if err != nil {
  76. return r.Id, fmt.Errorf("store the rule error: %v", err)
  77. }
  78. // Validate the topo
  79. panicOrError := infra.SafeRun(func() error {
  80. rs, err = createRuleState(r)
  81. return err
  82. })
  83. if panicOrError != nil {
  84. // Do not store to registry so also delete the KV
  85. deleteRule(r.Id)
  86. _, _ = ruleProcessor.ExecDrop(r.Id)
  87. return r.Id, fmt.Errorf("create rule topo error: %v", panicOrError)
  88. }
  89. // Start the rule asyncly
  90. if r.Triggered {
  91. go func() {
  92. panicOrError := infra.SafeRun(func() error {
  93. // Start the rule which runs async
  94. return rs.Start()
  95. })
  96. if panicOrError != nil {
  97. logger.Errorf("Rule %s start failed: %s", r.Id, panicOrError)
  98. }
  99. }()
  100. }
  101. return r.Id, nil
  102. }
  103. // Create and initialize a rule state.
  104. // Errors are possible during plan the topo.
  105. // If error happens return immediately without add it to the registry
  106. func createRuleState(r *api.Rule) (*rule.RuleState, error) {
  107. rs, err := rule.NewRuleState(r)
  108. if err != nil {
  109. return rs, err
  110. }
  111. registry.Store(r.Id, rs)
  112. return rs, nil
  113. }
  114. func recoverRule(r *api.Rule) string {
  115. var rs *rule.RuleState = nil
  116. var err error = nil
  117. // Validate the topo
  118. panicOrError := infra.SafeRun(func() error {
  119. rs, err = createRuleState(r)
  120. return err
  121. })
  122. if panicOrError != nil { // when recovering rules, assume the rules are valid, so always add it to the registry
  123. conf.Log.Errorf("Create rule topo error: %v", err)
  124. r.Triggered = false
  125. registry.Store(r.Id, rs)
  126. }
  127. if !r.Triggered {
  128. return fmt.Sprintf("Rule %s was stopped.", r.Id)
  129. } else {
  130. panicOrError := infra.SafeRun(func() error {
  131. // Start the rule which runs async
  132. return rs.Start()
  133. })
  134. if panicOrError != nil {
  135. return fmt.Sprintf("Rule %s start failed: %s", r.Id, panicOrError)
  136. }
  137. }
  138. return fmt.Sprintf("Rule %s was started.", r.Id)
  139. }
  140. func updateRule(ruleId, ruleJson string) error {
  141. // Validate the rule json
  142. r, err := ruleProcessor.GetRuleByJson(ruleId, ruleJson)
  143. if err != nil {
  144. return fmt.Errorf("Invalid rule json: %v", err)
  145. }
  146. if rs, ok := registry.Load(r.Id); ok {
  147. err := rs.UpdateTopo(r)
  148. if err != nil {
  149. return err
  150. }
  151. err = ruleProcessor.ExecReplaceRuleState(rs.RuleId, true)
  152. return err
  153. } else {
  154. return fmt.Errorf("Rule %s registry not found, try to delete it and recreate", r.Id)
  155. }
  156. }
  157. func deleteRule(name string) (result string) {
  158. if rs, ok := registry.Delete(name); ok {
  159. rs.Close()
  160. result = fmt.Sprintf("Rule %s was deleted.", name)
  161. } else {
  162. result = fmt.Sprintf("Rule %s was not found.", name)
  163. }
  164. return
  165. }
  166. func startRule(name string) error {
  167. return reRunRule(name)
  168. }
  169. // reRunRule rerun the rule from optimize to Open the operator in order to refresh the schema
  170. func reRunRule(name string) error {
  171. rs, ok := registry.Load(name)
  172. if !ok {
  173. return fmt.Errorf("Rule %s is not found in registry, please check if it is created", name)
  174. } else {
  175. if err := ruleProcessor.ExecReplaceRuleState(rs.RuleId, true); err != nil {
  176. return err
  177. }
  178. return rs.UpdateTopo(rs.Rule)
  179. }
  180. }
  181. func stopRule(name string) (result string) {
  182. if rs, ok := registry.Load(name); ok {
  183. err := rs.Stop()
  184. if err != nil {
  185. conf.Log.Warn(err)
  186. }
  187. err = ruleProcessor.ExecReplaceRuleState(name, false)
  188. if err != nil {
  189. conf.Log.Warnf("stop rule found error: %s", err.Error())
  190. }
  191. result = fmt.Sprintf("Rule %s was stopped.", name)
  192. } else {
  193. result = fmt.Sprintf("Rule %s was not found.", name)
  194. }
  195. return
  196. }
  197. func restartRule(name string) error {
  198. stopRule(name)
  199. time.Sleep(1 * time.Millisecond)
  200. return startRule(name)
  201. }
  202. func getRuleStatus(name string) (string, error) {
  203. if rs, ok := registry.Load(name); ok {
  204. result, err := rs.GetState()
  205. if err != nil {
  206. return "", err
  207. }
  208. if result == "Running" {
  209. keys, values := (*rs.Topology).GetMetrics()
  210. metrics := "{"
  211. metrics += `"status": "running",`
  212. for i, key := range keys {
  213. value := values[i]
  214. switch value.(type) {
  215. case string:
  216. metrics += fmt.Sprintf("\"%s\":%q,", key, value)
  217. default:
  218. metrics += fmt.Sprintf("\"%s\":%v,", key, value)
  219. }
  220. }
  221. metrics = metrics[:len(metrics)-1] + "}"
  222. dst := &bytes.Buffer{}
  223. if err = json.Indent(dst, cast.StringToBytes(metrics), "", " "); err != nil {
  224. result = metrics
  225. } else {
  226. result = dst.String()
  227. }
  228. } else {
  229. result = fmt.Sprintf(`{"status": "stopped", "message": "%s"}`, result)
  230. }
  231. return result, nil
  232. } else {
  233. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("Rule %s is not found", name))
  234. }
  235. }
  236. func getAllRulesWithStatus() ([]map[string]interface{}, error) {
  237. ruleIds, err := ruleProcessor.GetAllRules()
  238. if err != nil {
  239. return nil, err
  240. }
  241. sort.Strings(ruleIds)
  242. result := make([]map[string]interface{}, len(ruleIds))
  243. for i, id := range ruleIds {
  244. ruleName := id
  245. rule, _ := ruleProcessor.GetRuleById(id)
  246. if rule != nil && rule.Name != "" {
  247. ruleName = rule.Name
  248. }
  249. s, err := getRuleState(id)
  250. if err != nil {
  251. s = fmt.Sprintf("error: %s", err)
  252. }
  253. result[i] = map[string]interface{}{
  254. "id": id,
  255. "name": ruleName,
  256. "status": s,
  257. }
  258. }
  259. return result, nil
  260. }
  261. type ruleWrapper struct {
  262. rule *api.Rule
  263. state string
  264. }
  265. func getAllRulesWithState() ([]ruleWrapper, error) {
  266. ruleIds, err := ruleProcessor.GetAllRules()
  267. if err != nil {
  268. return nil, err
  269. }
  270. sort.Strings(ruleIds)
  271. rules := make([]ruleWrapper, 0, len(ruleIds))
  272. for _, id := range ruleIds {
  273. r, err := ruleProcessor.GetRuleById(id)
  274. if err != nil {
  275. return nil, err
  276. }
  277. s, err := getRuleState(id)
  278. if err != nil {
  279. return nil, err
  280. }
  281. rules = append(rules, ruleWrapper{rule: r, state: s})
  282. }
  283. return rules, nil
  284. }
  285. func getRuleState(name string) (string, error) {
  286. if rs, ok := registry.Load(name); ok {
  287. return rs.GetState()
  288. } else {
  289. return "", fmt.Errorf("Rule %s is not found in registry", name)
  290. }
  291. }
  292. func getRuleTopo(name string) (string, error) {
  293. if rs, ok := registry.Load(name); ok {
  294. graph := rs.GetTopoGraph()
  295. if graph == nil {
  296. return "", errorx.New(fmt.Sprintf("Fail to get rule %s's topo, make sure the rule has been started before", name))
  297. }
  298. bs, err := json.Marshal(graph)
  299. if err != nil {
  300. return "", errorx.New(fmt.Sprintf("Fail to encode rule %s's topo", name))
  301. } else {
  302. return string(bs), nil
  303. }
  304. } else {
  305. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("Rule %s is not found", name))
  306. }
  307. }
  308. func validateRule(name, ruleJson string) (bool, error) {
  309. // Validate the rule json
  310. _, err := ruleProcessor.GetRuleByJson(name, ruleJson)
  311. if err != nil {
  312. return false, fmt.Errorf("invalid rule json: %v", err)
  313. }
  314. return true, nil
  315. }