ruleState.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rule
  15. import (
  16. "context"
  17. "fmt"
  18. "math"
  19. "math/rand"
  20. "sync"
  21. "time"
  22. "github.com/lf-edge/ekuiper/internal/conf"
  23. "github.com/lf-edge/ekuiper/internal/topo"
  24. "github.com/lf-edge/ekuiper/internal/topo/planner"
  25. "github.com/lf-edge/ekuiper/pkg/api"
  26. "github.com/lf-edge/ekuiper/pkg/infra"
  27. )
  28. type ActionSignal int
  29. const (
  30. ActionSignalStart ActionSignal = iota
  31. ActionSignalStop
  32. )
  33. /*********
  34. * RuleState is created for each rule. Each ruleState runs two loops:
  35. * 1. action event loop to accept commands, such as start, stop, getStatus, delete
  36. * 2. topo running loop
  37. * Both loops need to access the status, so lock is needed
  38. */
  39. type RuleState struct {
  40. // Constant, never change. Channel to send signals to manage connection retry. When deleting the rule, close it.
  41. RuleId string
  42. ActionCh chan ActionSignal
  43. // Nearly constant, only change when update the rule
  44. Rule *api.Rule
  45. // States, create through rule in each rule start
  46. Topology *topo.Topo
  47. // 0 stop, 1 start, -1 delete, changed in actions
  48. triggered int
  49. // temporary storage for topo graph to make sure even rule close, the graph is still available
  50. topoGraph *api.PrintableTopo
  51. sync.RWMutex
  52. }
  53. // NewRuleState Create and initialize a rule state.
  54. // Errors are possible during plan the topo.
  55. // If error happens return immediately without add it to the registry
  56. func NewRuleState(rule *api.Rule) (*RuleState, error) {
  57. rs := &RuleState{
  58. RuleId: rule.Id,
  59. Rule: rule,
  60. ActionCh: make(chan ActionSignal),
  61. }
  62. rs.Run()
  63. if tp, err := planner.Plan(rule); err != nil {
  64. return rs, err
  65. } else {
  66. rs.Topology = tp
  67. return rs, nil
  68. }
  69. }
  70. // UpdateTopo update the rule and the topology AND restart the topology
  71. // Do not need to call restart after update
  72. func (rs *RuleState) UpdateTopo(rule *api.Rule) error {
  73. if tp, err := planner.Plan(rule); err != nil {
  74. return err
  75. } else {
  76. rs.Lock()
  77. defer rs.Unlock()
  78. // Update rule
  79. rs.Rule = rule
  80. rs.topoGraph = nil
  81. // Stop the old topo
  82. if rs.triggered == 1 {
  83. rs.Topology.Cancel()
  84. rs.ActionCh <- ActionSignalStop
  85. // wait a little to make sure the old topo is stopped
  86. time.Sleep(1 * time.Millisecond)
  87. }
  88. // Update the topo and start
  89. rs.Topology = tp
  90. rs.triggered = 1
  91. rs.ActionCh <- ActionSignalStart
  92. return nil
  93. }
  94. }
  95. // Run start to run the two loops, do not access any changeable states
  96. func (rs *RuleState) Run() {
  97. var (
  98. ctx context.Context
  99. cancel context.CancelFunc
  100. )
  101. // action loop, once start never end until the rule is deleted
  102. go func() {
  103. conf.Log.Infof("Start rulestate %s", rs.RuleId)
  104. for {
  105. s, opened := <-rs.ActionCh
  106. if !opened {
  107. conf.Log.Infof("Stop rulestate %s", rs.RuleId)
  108. if cancel != nil {
  109. cancel()
  110. }
  111. return
  112. }
  113. switch s {
  114. case ActionSignalStart:
  115. if ctx != nil {
  116. conf.Log.Warnf("rule %s is already started", rs.RuleId)
  117. } else {
  118. ctx, cancel = context.WithCancel(context.Background())
  119. go rs.runTopo(ctx)
  120. }
  121. case ActionSignalStop:
  122. // Stop the running loop
  123. if cancel != nil {
  124. cancel()
  125. ctx = nil
  126. cancel = nil
  127. } else {
  128. conf.Log.Warnf("rule %s is already stopped", rs.RuleId)
  129. }
  130. }
  131. }
  132. }()
  133. }
  134. func (rs *RuleState) runTopo(ctx context.Context) {
  135. // Load the changeable states once
  136. rs.Lock()
  137. tp := rs.Topology
  138. option := rs.Rule.Options.Restart
  139. rs.Unlock()
  140. if tp == nil {
  141. conf.Log.Warnf("rule %s is not initialized or just stopped", rs.RuleId)
  142. return
  143. }
  144. err := infra.SafeRun(func() error {
  145. count := 0
  146. d := option.Delay
  147. var (
  148. er error
  149. )
  150. ticker := time.NewTicker(time.Duration(d) * time.Millisecond)
  151. defer ticker.Stop()
  152. for {
  153. select {
  154. case e := <-tp.Open():
  155. er = e
  156. if er != nil { // Only restart rule for errors
  157. tp.GetContext().SetError(er)
  158. conf.Log.Errorf("closing rule %s for error: %v", rs.RuleId, er)
  159. tp.Cancel()
  160. } else { // exit normally
  161. return nil
  162. }
  163. }
  164. if count < option.Attempts {
  165. if d > option.MaxDelay {
  166. d = option.MaxDelay
  167. }
  168. if option.JitterFactor > 0 {
  169. d = int(math.Round(float64(d) * ((rand.Float64()*2-1)*0.1 + 1)))
  170. conf.Log.Infof("Rule %s will restart with jitterred delay %d", rs.RuleId, d)
  171. } else {
  172. conf.Log.Infof("Rule %s will restart with delay %d", rs.RuleId, d)
  173. }
  174. // retry after delay
  175. select {
  176. case <-ticker.C:
  177. break
  178. case <-ctx.Done():
  179. conf.Log.Errorf("stop rule %s retry as cancelled", rs.RuleId)
  180. return nil
  181. }
  182. count++
  183. if option.Multiplier > 0 {
  184. d = option.Delay * int(math.Pow(option.Multiplier, float64(count)))
  185. }
  186. } else {
  187. return er
  188. }
  189. }
  190. })
  191. if err != nil { // Exit after retries
  192. rs.Lock()
  193. // The only change the state by error
  194. if rs.triggered != -1 {
  195. rs.triggered = 0
  196. if rs.Topology != nil {
  197. rs.topoGraph = rs.Topology.GetTopo()
  198. }
  199. rs.ActionCh <- ActionSignalStop
  200. }
  201. rs.Unlock()
  202. }
  203. }
  204. // The action functions are state machine.
  205. func (rs *RuleState) Start() error {
  206. rs.Lock()
  207. defer rs.Unlock()
  208. if rs.triggered == -1 {
  209. return fmt.Errorf("rule %s is already deleted", rs.RuleId)
  210. }
  211. if rs.triggered != 1 {
  212. // If the rule has been stopped due to error, the topology is not nil
  213. if rs.Topology != nil {
  214. rs.Topology.Cancel()
  215. }
  216. if tp, err := planner.Plan(rs.Rule); err != nil {
  217. return err
  218. } else {
  219. rs.Topology = tp
  220. }
  221. rs.triggered = 1
  222. }
  223. rs.ActionCh <- ActionSignalStart
  224. return nil
  225. }
  226. // Stop remove the Topology
  227. func (rs *RuleState) Stop() error {
  228. rs.Lock()
  229. defer rs.Unlock()
  230. if rs.triggered == -1 {
  231. return fmt.Errorf("rule %s is already deleted", rs.RuleId)
  232. }
  233. rs.triggered = 0
  234. if rs.Topology != nil {
  235. rs.Topology.Cancel()
  236. }
  237. rs.ActionCh <- ActionSignalStop
  238. return nil
  239. }
  240. func (rs *RuleState) Close() error {
  241. rs.Lock()
  242. defer rs.Unlock()
  243. if rs.Topology != nil {
  244. rs.Topology.RemoveMetrics()
  245. }
  246. if rs.triggered == 1 && rs.Topology != nil {
  247. rs.Topology.Cancel()
  248. }
  249. rs.triggered = -1
  250. close(rs.ActionCh)
  251. return nil
  252. }
  253. func (rs *RuleState) GetState() (string, error) {
  254. rs.RLock()
  255. defer rs.RUnlock()
  256. result := ""
  257. if rs.Topology == nil {
  258. result = "Stopped: fail to create the topo."
  259. } else {
  260. c := (*rs.Topology).GetContext()
  261. if c != nil {
  262. err := c.Err()
  263. switch err {
  264. case nil:
  265. result = "Running"
  266. case context.Canceled:
  267. result = "Stopped: canceled manually."
  268. case context.DeadlineExceeded:
  269. result = "Stopped: deadline exceed."
  270. default:
  271. result = fmt.Sprintf("Stopped: %v.", err)
  272. }
  273. } else {
  274. result = "Stopped: canceled manually."
  275. }
  276. }
  277. return result, nil
  278. }
  279. func (rs *RuleState) GetTopoGraph() *api.PrintableTopo {
  280. rs.RLock()
  281. defer rs.RUnlock()
  282. if rs.topoGraph != nil {
  283. return rs.topoGraph
  284. } else if rs.Topology != nil {
  285. return rs.Topology.GetTopo()
  286. } else {
  287. return nil
  288. }
  289. }