ruleState.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rule
  15. import (
  16. "context"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo"
  20. "github.com/lf-edge/ekuiper/internal/topo/planner"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/infra"
  23. "math"
  24. "math/rand"
  25. "sync"
  26. "time"
  27. )
  28. type ActionSignal int
  29. const (
  30. ActionSignalStart ActionSignal = iota
  31. ActionSignalStop
  32. )
  33. /*********
  34. * RuleState is created for each rule. Each ruleState runs two loops:
  35. * 1. action event loop to accept commands, such as start, stop, getStatus, delete
  36. * 2. topo running loop
  37. * Both loops need to access the status, so lock is needed
  38. */
  39. type RuleState struct {
  40. // Constant, never change. Channel to send signals to manage connection retry. When deleting the rule, close it.
  41. RuleId string
  42. ActionCh chan ActionSignal
  43. // Nearly constant, only change when update the rule
  44. Rule *api.Rule
  45. // States, create through rule in each rule start
  46. Topology *topo.Topo
  47. // 0 stop, 1 start, -1 delete, changed in actions
  48. triggered int
  49. // temporary storage for topo graph to make sure even rule close, the graph is still available
  50. topoGraph *api.PrintableTopo
  51. sync.RWMutex
  52. }
  53. // NewRuleState Create and initialize a rule state.
  54. // Errors are possible during plan the topo.
  55. // If error happens return immediately without add it to the registry
  56. func NewRuleState(rule *api.Rule) (*RuleState, error) {
  57. if tp, err := planner.Plan(rule); err != nil {
  58. return nil, err
  59. } else {
  60. rs := &RuleState{
  61. RuleId: rule.Id,
  62. Rule: rule,
  63. ActionCh: make(chan ActionSignal),
  64. }
  65. rs.Topology = tp
  66. rs.Run()
  67. return rs, nil
  68. }
  69. }
  70. // UpdateTopo update the rule and the topology AND restart the topology
  71. // Do not need to call restart after update
  72. func (rs *RuleState) UpdateTopo(rule *api.Rule) error {
  73. if tp, err := planner.Plan(rule); err != nil {
  74. return err
  75. } else {
  76. rs.Lock()
  77. defer rs.Unlock()
  78. // Update rule
  79. rs.Rule = rule
  80. rs.topoGraph = nil
  81. // Stop the old topo
  82. if rs.triggered == 1 {
  83. rs.Topology.Cancel()
  84. rs.ActionCh <- ActionSignalStop
  85. // wait a little to make sure the old topo is stopped
  86. time.Sleep(1 * time.Millisecond)
  87. }
  88. // Update the topo and start
  89. rs.Topology = tp
  90. rs.triggered = 1
  91. rs.ActionCh <- ActionSignalStart
  92. return nil
  93. }
  94. }
  95. // Run start to run the two loops, do not access any changeable states
  96. func (rs *RuleState) Run() {
  97. var (
  98. ctx context.Context
  99. cancel context.CancelFunc
  100. )
  101. // action loop, once start never end until the rule is deleted
  102. go func() {
  103. conf.Log.Infof("Start rulestate %s", rs.RuleId)
  104. for {
  105. s, opened := <-rs.ActionCh
  106. if !opened {
  107. conf.Log.Infof("Stop rulestate %s", rs.RuleId)
  108. if cancel != nil {
  109. cancel()
  110. }
  111. return
  112. }
  113. switch s {
  114. case ActionSignalStart:
  115. if ctx != nil {
  116. conf.Log.Warnf("rule %s is already started", rs.RuleId)
  117. } else {
  118. ctx, cancel = context.WithCancel(context.Background())
  119. go rs.runTopo(ctx)
  120. }
  121. case ActionSignalStop:
  122. // Stop the running loop
  123. if cancel != nil {
  124. cancel()
  125. ctx = nil
  126. cancel = nil
  127. } else {
  128. conf.Log.Warnf("rule %s is already stopped", rs.RuleId)
  129. }
  130. }
  131. }
  132. }()
  133. }
  134. func (rs *RuleState) runTopo(ctx context.Context) {
  135. // Load the changeable states once
  136. rs.Lock()
  137. tp := rs.Topology
  138. option := rs.Rule.Options.Restart
  139. rs.Unlock()
  140. if tp == nil {
  141. conf.Log.Warnf("rule %s is not initialized or just stopped", rs.RuleId)
  142. return
  143. }
  144. err := infra.SafeRun(func() error {
  145. count := 0
  146. d := option.Delay
  147. var (
  148. er error
  149. )
  150. for {
  151. select {
  152. case e := <-tp.Open():
  153. er = e
  154. if er != nil { // Only restart rule for errors
  155. tp.GetContext().SetError(er)
  156. conf.Log.Errorf("closing rule %s for error: %v", rs.RuleId, er)
  157. tp.Cancel()
  158. } else { // exit normally
  159. return nil
  160. }
  161. }
  162. if count <= option.Attempts {
  163. if d > option.MaxDelay {
  164. d = option.MaxDelay
  165. }
  166. if option.JitterFactor > 0 {
  167. d = int(math.Round(float64(d) * ((rand.Float64()*2-1)*0.1 + 1)))
  168. conf.Log.Infof("Rule %s will restart with jitterred delay %d", rs.RuleId, d)
  169. } else {
  170. conf.Log.Infof("Rule %s will restart with delay %d", rs.RuleId, d)
  171. }
  172. // retry after delay
  173. select {
  174. case <-time.Tick(time.Duration(d) * time.Millisecond):
  175. break
  176. case <-ctx.Done():
  177. conf.Log.Errorf("stop rule %s retry as cancelled", rs.RuleId)
  178. return nil
  179. }
  180. count++
  181. if option.Multiplier > 0 {
  182. d = option.Delay * int(math.Pow(option.Multiplier, float64(count)))
  183. }
  184. } else {
  185. return er
  186. }
  187. }
  188. })
  189. if err != nil { // Exit after retries
  190. rs.Lock()
  191. // The only change the state by error
  192. rs.triggered = 0
  193. rs.topoGraph = rs.Topology.GetTopo()
  194. rs.Topology = nil
  195. rs.Unlock()
  196. }
  197. }
  198. // The action functions are state machine.
  199. func (rs *RuleState) Start() error {
  200. rs.Lock()
  201. defer rs.Unlock()
  202. switch rs.triggered {
  203. case 1:
  204. return fmt.Errorf("Rule %s is already starting", rs.RuleId)
  205. case -1:
  206. return fmt.Errorf("Rule %s is already deleted", rs.RuleId)
  207. default:
  208. // Start from stop
  209. if rs.Topology == nil {
  210. if tp, err := planner.Plan(rs.Rule); err != nil {
  211. return err
  212. } else {
  213. rs.Topology = tp
  214. }
  215. } // else start after create
  216. rs.triggered = 1
  217. rs.ActionCh <- ActionSignalStart
  218. return nil
  219. }
  220. }
  221. // Stop remove the Topology
  222. func (rs *RuleState) Stop() error {
  223. rs.Lock()
  224. defer rs.Unlock()
  225. switch rs.triggered {
  226. case 0:
  227. return fmt.Errorf("Rule %s is already stopping", rs.RuleId)
  228. case -1:
  229. return fmt.Errorf("Rule %s is already deleted", rs.RuleId)
  230. default:
  231. rs.triggered = 0
  232. rs.Topology.Cancel()
  233. rs.Topology = nil
  234. rs.ActionCh <- ActionSignalStop
  235. return nil
  236. }
  237. }
  238. func (rs *RuleState) Close() error {
  239. rs.Lock()
  240. defer rs.Unlock()
  241. if rs.triggered == 1 && rs.Topology != nil {
  242. rs.Topology.Cancel()
  243. }
  244. rs.triggered = -1
  245. close(rs.ActionCh)
  246. return nil
  247. }
  248. func (rs *RuleState) GetState() (string, error) {
  249. rs.RLock()
  250. defer rs.RUnlock()
  251. result := ""
  252. if rs.Topology == nil {
  253. result = "Stopped: canceled manually or by error."
  254. } else {
  255. c := (*rs.Topology).GetContext()
  256. if c != nil {
  257. err := c.Err()
  258. switch err {
  259. case nil:
  260. result = "Running"
  261. case context.Canceled:
  262. result = "Stopped: canceled by error."
  263. case context.DeadlineExceeded:
  264. result = "Stopped: deadline exceed."
  265. default:
  266. result = fmt.Sprintf("Stopped: %v.", err)
  267. }
  268. } else {
  269. result = "Stopped: no context found."
  270. }
  271. }
  272. return result, nil
  273. }
  274. func (rs *RuleState) GetTopoGraph() *api.PrintableTopo {
  275. rs.RLock()
  276. defer rs.RUnlock()
  277. if rs.topoGraph != nil {
  278. return rs.topoGraph
  279. } else if rs.Topology != nil {
  280. return rs.Topology.GetTopo()
  281. } else {
  282. return nil
  283. }
  284. }