ruleState.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rule
  15. import (
  16. "context"
  17. "fmt"
  18. "math"
  19. "math/rand"
  20. "sync"
  21. "time"
  22. "github.com/robfig/cron/v3"
  23. "github.com/lf-edge/ekuiper/internal/conf"
  24. "github.com/lf-edge/ekuiper/internal/topo"
  25. "github.com/lf-edge/ekuiper/internal/topo/planner"
  26. "github.com/lf-edge/ekuiper/pkg/api"
  27. "github.com/lf-edge/ekuiper/pkg/infra"
  28. )
  29. type ActionSignal int
  30. const (
  31. ActionSignalStart ActionSignal = iota
  32. ActionSignalStop
  33. )
  34. type cronInterface interface {
  35. Start()
  36. AddFunc(spec string, cmd func()) (cron.EntryID, error)
  37. Remove(id cron.EntryID)
  38. }
  39. var backgroundCron cronInterface
  40. func init() {
  41. if !conf.IsTesting {
  42. backgroundCron = cron.New()
  43. } else {
  44. backgroundCron = &MockCron{}
  45. }
  46. backgroundCron.Start()
  47. }
  48. type cronStateCtx struct {
  49. cancel context.CancelFunc
  50. entryID cron.EntryID
  51. // isInSchedule indicates the current rule is in scheduled in backgroundCron
  52. isInSchedule bool
  53. startFailedCnt int
  54. // only used for test
  55. cron string
  56. duration string
  57. }
  58. /*********
  59. * RuleState is created for each rule. Each ruleState runs two loops:
  60. * 1. action event loop to accept commands, such as start, stop, getStatus, delete
  61. * 2. topo running loop
  62. * Both loops need to access the status, so lock is needed
  63. */
  64. type RuleState struct {
  65. // Constant, never change. Channel to send signals to manage connection retry. When deleting the rule, close it.
  66. RuleId string
  67. ActionCh chan ActionSignal
  68. // Nearly constant, only change when update the rule
  69. Rule *api.Rule
  70. // States, create through rule in each rule start
  71. Topology *topo.Topo
  72. // 0 stop, 1 start, -1 delete, changed in actions
  73. triggered int
  74. // temporary storage for topo graph to make sure even rule close, the graph is still available
  75. topoGraph *api.PrintableTopo
  76. sync.RWMutex
  77. cronState cronStateCtx
  78. }
  79. // NewRuleState Create and initialize a rule state.
  80. // Errors are possible during plan the topo.
  81. // If error happens return immediately without add it to the registry
  82. func NewRuleState(rule *api.Rule) (*RuleState, error) {
  83. rs := &RuleState{
  84. RuleId: rule.Id,
  85. Rule: rule,
  86. ActionCh: make(chan ActionSignal),
  87. }
  88. rs.run()
  89. if tp, err := planner.Plan(rule); err != nil {
  90. return rs, err
  91. } else {
  92. rs.Topology = tp
  93. return rs, nil
  94. }
  95. }
  96. // UpdateTopo update the rule and the topology AND restart the topology
  97. // Do not need to call restart after update
  98. func (rs *RuleState) UpdateTopo(rule *api.Rule) error {
  99. if _, err := planner.Plan(rule); err != nil {
  100. return err
  101. }
  102. if err := rs.Stop(); err != nil {
  103. return err
  104. }
  105. time.Sleep(1 * time.Millisecond)
  106. rs.Rule = rule
  107. return rs.Start()
  108. }
  109. // Run start to run the two loops, do not access any changeable states
  110. func (rs *RuleState) run() {
  111. var (
  112. ctx context.Context
  113. cancel context.CancelFunc
  114. )
  115. // action loop, once start never end until the rule is deleted
  116. go func() {
  117. conf.Log.Infof("Start rulestate %s", rs.RuleId)
  118. for {
  119. s, opened := <-rs.ActionCh
  120. if !opened {
  121. conf.Log.Infof("Stop rulestate %s", rs.RuleId)
  122. if cancel != nil {
  123. cancel()
  124. }
  125. return
  126. }
  127. switch s {
  128. case ActionSignalStart:
  129. if ctx != nil {
  130. conf.Log.Warnf("rule %s is already started", rs.RuleId)
  131. } else {
  132. ctx, cancel = context.WithCancel(context.Background())
  133. go rs.runTopo(ctx)
  134. }
  135. case ActionSignalStop:
  136. // Stop the running loop
  137. if cancel != nil {
  138. cancel()
  139. ctx = nil
  140. cancel = nil
  141. } else {
  142. conf.Log.Warnf("rule %s is already stopped", rs.RuleId)
  143. }
  144. }
  145. }
  146. }()
  147. }
  148. func (rs *RuleState) runTopo(ctx context.Context) {
  149. // Load the changeable states once
  150. rs.Lock()
  151. tp := rs.Topology
  152. option := rs.Rule.Options.Restart
  153. rs.Unlock()
  154. if tp == nil {
  155. conf.Log.Warnf("rule %s is not initialized or just stopped", rs.RuleId)
  156. return
  157. }
  158. err := infra.SafeRun(func() error {
  159. count := 0
  160. d := option.Delay
  161. var er error
  162. ticker := time.NewTicker(time.Duration(d) * time.Millisecond)
  163. defer ticker.Stop()
  164. for {
  165. select {
  166. case e := <-tp.Open():
  167. er = e
  168. if er != nil { // Only restart rule for errors
  169. tp.GetContext().SetError(er)
  170. conf.Log.Errorf("closing rule %s for error: %v", rs.RuleId, er)
  171. tp.Cancel()
  172. } else { // exit normally
  173. return nil
  174. }
  175. }
  176. if count < option.Attempts {
  177. if d > option.MaxDelay {
  178. d = option.MaxDelay
  179. }
  180. if option.JitterFactor > 0 {
  181. d = int(math.Round(float64(d) * ((rand.Float64()*2-1)*0.1 + 1)))
  182. conf.Log.Infof("Rule %s will restart with jitterred delay %d", rs.RuleId, d)
  183. } else {
  184. conf.Log.Infof("Rule %s will restart with delay %d", rs.RuleId, d)
  185. }
  186. // retry after delay
  187. select {
  188. case <-ticker.C:
  189. break
  190. case <-ctx.Done():
  191. conf.Log.Errorf("stop rule %s retry as cancelled", rs.RuleId)
  192. return nil
  193. }
  194. count++
  195. if option.Multiplier > 0 {
  196. d = option.Delay * int(math.Pow(option.Multiplier, float64(count)))
  197. }
  198. } else {
  199. return er
  200. }
  201. }
  202. })
  203. if err != nil { // Exit after retries
  204. rs.Lock()
  205. // The only change the state by error
  206. if rs.triggered != -1 {
  207. rs.triggered = 0
  208. if rs.Topology != nil {
  209. rs.topoGraph = rs.Topology.GetTopo()
  210. }
  211. rs.ActionCh <- ActionSignalStop
  212. }
  213. rs.Unlock()
  214. }
  215. }
  216. // The action functions are state machine.
  217. func (rs *RuleState) Start() error {
  218. rs.Lock()
  219. defer rs.Unlock()
  220. if rs.triggered == -1 {
  221. return fmt.Errorf("rule %s is already deleted", rs.RuleId)
  222. }
  223. if rs.Rule.IsScheduleRule() {
  224. return rs.startScheduleRule()
  225. }
  226. return rs.start()
  227. }
  228. // startScheduleRule will register the job in the backgroundCron to run.
  229. // Job will do following 2 things:
  230. // 1. start the rule in cron if else the job is already stopped
  231. // 2. after the rule started, start an extract goroutine to stop the rule after specific duration
  232. func (rs *RuleState) startScheduleRule() error {
  233. if rs.cronState.isInSchedule {
  234. return fmt.Errorf("rule %s is already in schedule", rs.RuleId)
  235. }
  236. d, err := time.ParseDuration(rs.Rule.Options.Duration)
  237. if err != nil {
  238. return err
  239. }
  240. var cronCtx context.Context
  241. cronCtx, rs.cronState.cancel = context.WithCancel(context.Background())
  242. entryID, err := backgroundCron.AddFunc(rs.Rule.Options.Cron, func() {
  243. var started bool
  244. var err error
  245. if started, err = func() (bool, error) {
  246. switch backgroundCron.(type) {
  247. case *MockCron:
  248. // skip mutex if this is a unit test
  249. default:
  250. rs.Lock()
  251. defer rs.Unlock()
  252. }
  253. now := conf.GetNow()
  254. var err error
  255. allowed := true
  256. for _, timeRange := range rs.Rule.Options.CronDatetimeRange {
  257. allowed, err = isInScheduleRange(now, timeRange.Begin, timeRange.End)
  258. if err != nil {
  259. return false, err
  260. }
  261. if allowed {
  262. break
  263. }
  264. }
  265. if !allowed {
  266. return false, nil
  267. }
  268. rs.cronState.cron = rs.Rule.Options.Cron
  269. rs.cronState.duration = rs.Rule.Options.Duration
  270. return true, rs.start()
  271. }(); err != nil {
  272. rs.Lock()
  273. rs.cronState.startFailedCnt++
  274. rs.Unlock()
  275. conf.Log.Errorf(err.Error())
  276. return
  277. }
  278. if started {
  279. after := time.After(d)
  280. go func(ctx context.Context) {
  281. select {
  282. case <-after:
  283. rs.Lock()
  284. defer rs.Unlock()
  285. if err := rs.stop(); err != nil {
  286. conf.Log.Errorf("close rule %s failed, err: %v", rs.RuleId, err)
  287. }
  288. return
  289. case <-cronCtx.Done():
  290. return
  291. }
  292. }(cronCtx)
  293. }
  294. })
  295. if err != nil {
  296. return err
  297. }
  298. rs.cronState.isInSchedule = true
  299. rs.cronState.entryID = entryID
  300. return nil
  301. }
  302. func (rs *RuleState) start() error {
  303. if rs.triggered != 1 {
  304. // If the rule has been stopped due to error, the topology is not nil
  305. if rs.Topology != nil {
  306. rs.Topology.Cancel()
  307. }
  308. if tp, err := planner.Plan(rs.Rule); err != nil {
  309. return err
  310. } else {
  311. rs.Topology = tp
  312. }
  313. rs.triggered = 1
  314. }
  315. rs.ActionCh <- ActionSignalStart
  316. return nil
  317. }
  318. // Stop remove the Topology
  319. func (rs *RuleState) Stop() error {
  320. rs.Lock()
  321. defer rs.Unlock()
  322. rs.stopScheduleRule()
  323. return rs.stop()
  324. }
  325. func (rs *RuleState) stopScheduleRule() {
  326. if rs.Rule.IsScheduleRule() && rs.cronState.isInSchedule {
  327. rs.cronState.isInSchedule = false
  328. if rs.cronState.cancel != nil {
  329. rs.cronState.cancel()
  330. }
  331. rs.cronState.startFailedCnt = 0
  332. backgroundCron.Remove(rs.cronState.entryID)
  333. }
  334. }
  335. func (rs *RuleState) stop() error {
  336. if rs.triggered == -1 {
  337. return fmt.Errorf("rule %s is already deleted", rs.RuleId)
  338. }
  339. rs.triggered = 0
  340. if rs.Topology != nil {
  341. rs.Topology.Cancel()
  342. }
  343. rs.ActionCh <- ActionSignalStop
  344. return nil
  345. }
  346. func (rs *RuleState) Close() error {
  347. rs.Lock()
  348. defer rs.Unlock()
  349. if rs.Topology != nil {
  350. rs.Topology.RemoveMetrics()
  351. }
  352. if rs.triggered == 1 && rs.Topology != nil {
  353. rs.Topology.Cancel()
  354. }
  355. rs.triggered = -1
  356. rs.stopScheduleRule()
  357. close(rs.ActionCh)
  358. return nil
  359. }
  360. func (rs *RuleState) GetState() (string, error) {
  361. rs.RLock()
  362. defer rs.RUnlock()
  363. result := ""
  364. if rs.Topology == nil {
  365. result = "Stopped: fail to create the topo."
  366. } else {
  367. c := (*rs.Topology).GetContext()
  368. if c != nil {
  369. err := c.Err()
  370. switch err {
  371. case nil:
  372. result = "Running"
  373. case context.Canceled:
  374. if rs.Rule.IsScheduleRule() && rs.cronState.isInSchedule {
  375. if isAfterTimeRanges(conf.GetNow(), rs.Rule.Options.CronDatetimeRange) {
  376. result = "Stopped: schedule terminated."
  377. } else {
  378. result = "Stopped: waiting for next schedule."
  379. }
  380. } else {
  381. result = "Stopped: canceled manually."
  382. }
  383. case context.DeadlineExceeded:
  384. result = "Stopped: deadline exceed."
  385. default:
  386. result = fmt.Sprintf("Stopped: %v.", err)
  387. }
  388. } else {
  389. if rs.cronState.isInSchedule {
  390. if isAfterTimeRanges(conf.GetNow(), rs.Rule.Options.CronDatetimeRange) {
  391. result = "Stopped: schedule terminated."
  392. } else {
  393. result = "Stopped: waiting for next schedule."
  394. }
  395. } else {
  396. result = "Stopped: canceled manually."
  397. }
  398. }
  399. }
  400. if rs.Rule.IsScheduleRule() && rs.cronState.startFailedCnt > 0 {
  401. result = result + fmt.Sprintf(" Start failed count: %v.", rs.cronState.startFailedCnt)
  402. }
  403. return result, nil
  404. }
  405. func (rs *RuleState) GetTopoGraph() *api.PrintableTopo {
  406. rs.RLock()
  407. defer rs.RUnlock()
  408. if rs.topoGraph != nil {
  409. return rs.topoGraph
  410. } else if rs.Topology != nil {
  411. return rs.Topology.GetTopo()
  412. } else {
  413. return nil
  414. }
  415. }
  416. const layout = "2006-01-02 15:04:05"
  417. func isInScheduleRange(now time.Time, start string, end string) (bool, error) {
  418. s, err := time.Parse(layout, start)
  419. if err != nil {
  420. return false, err
  421. }
  422. e, err := time.Parse(layout, end)
  423. if err != nil {
  424. return false, err
  425. }
  426. isBefore := s.Before(now)
  427. isAfter := e.After(now)
  428. if isBefore && isAfter {
  429. return true, nil
  430. }
  431. return false, nil
  432. }
  433. func isAfterTimeRanges(now time.Time, ranges []api.DatetimeRange) bool {
  434. if len(ranges) < 1 {
  435. return false
  436. }
  437. for _, r := range ranges {
  438. isAfter, err := isAfterTimeRange(now, r.End)
  439. if err != nil || !isAfter {
  440. return false
  441. }
  442. }
  443. return true
  444. }
  445. func isAfterTimeRange(now time.Time, end string) (bool, error) {
  446. e, err := time.Parse(layout, end)
  447. if err != nil {
  448. return false, err
  449. }
  450. return now.After(e), nil
  451. }