ruleState.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rule
  15. import (
  16. "context"
  17. "fmt"
  18. "math"
  19. "math/rand"
  20. "strings"
  21. "sync"
  22. "time"
  23. "github.com/robfig/cron/v3"
  24. "github.com/lf-edge/ekuiper/internal/conf"
  25. "github.com/lf-edge/ekuiper/internal/topo"
  26. "github.com/lf-edge/ekuiper/internal/topo/planner"
  27. "github.com/lf-edge/ekuiper/pkg/api"
  28. "github.com/lf-edge/ekuiper/pkg/infra"
  29. )
  30. type ActionSignal int
  31. const (
  32. ActionSignalStart ActionSignal = iota
  33. ActionSignalStop
  34. )
  35. type cronInterface interface {
  36. Start()
  37. AddFunc(spec string, cmd func()) (cron.EntryID, error)
  38. Remove(id cron.EntryID)
  39. }
  40. var backgroundCron cronInterface
  41. func init() {
  42. if !conf.IsTesting {
  43. backgroundCron = cron.New()
  44. } else {
  45. backgroundCron = &MockCron{}
  46. }
  47. backgroundCron.Start()
  48. }
  49. type cronStateCtx struct {
  50. cancel context.CancelFunc
  51. entryID cron.EntryID
  52. // isInSchedule indicates the current rule is in scheduled in backgroundCron
  53. isInSchedule bool
  54. startFailedCnt int
  55. // only used for test
  56. cron string
  57. duration string
  58. }
  59. /*********
  60. * RuleState is created for each rule. Each ruleState runs two loops:
  61. * 1. action event loop to accept commands, such as start, stop, getStatus, delete
  62. * 2. topo running loop
  63. * Both loops need to access the status, so lock is needed
  64. */
  65. type RuleState struct {
  66. // Constant, never change. Channel to send signals to manage connection retry. When deleting the rule, close it.
  67. RuleId string
  68. ActionCh chan ActionSignal
  69. // Nearly constant, only change when update the rule
  70. Rule *api.Rule
  71. // States, create through rule in each rule start
  72. Topology *topo.Topo
  73. // 0 stop, 1 start, -1 delete, changed in actions
  74. triggered int
  75. // temporary storage for topo graph to make sure even rule close, the graph is still available
  76. topoGraph *api.PrintableTopo
  77. sync.RWMutex
  78. cronState cronStateCtx
  79. }
  80. // NewRuleState Create and initialize a rule state.
  81. // Errors are possible during plan the topo.
  82. // If error happens return immediately without add it to the registry
  83. func NewRuleState(rule *api.Rule) (*RuleState, error) {
  84. rs := &RuleState{
  85. RuleId: rule.Id,
  86. Rule: rule,
  87. ActionCh: make(chan ActionSignal),
  88. }
  89. rs.run()
  90. if tp, err := planner.Plan(rule); err != nil {
  91. return rs, err
  92. } else {
  93. rs.Topology = tp
  94. return rs, nil
  95. }
  96. }
  97. // UpdateTopo update the rule and the topology AND restart the topology
  98. // Do not need to call restart after update
  99. func (rs *RuleState) UpdateTopo(rule *api.Rule) error {
  100. if _, err := planner.Plan(rule); err != nil {
  101. return err
  102. }
  103. if err := rs.Stop(); err != nil {
  104. return err
  105. }
  106. time.Sleep(1 * time.Millisecond)
  107. rs.Rule = rule
  108. return rs.Start()
  109. }
  110. // Run start to run the two loops, do not access any changeable states
  111. func (rs *RuleState) run() {
  112. var (
  113. ctx context.Context
  114. cancel context.CancelFunc
  115. )
  116. // action loop, once start never end until the rule is deleted
  117. go func() {
  118. conf.Log.Infof("Start rulestate %s", rs.RuleId)
  119. for {
  120. s, opened := <-rs.ActionCh
  121. if !opened {
  122. conf.Log.Infof("Stop rulestate %s", rs.RuleId)
  123. if cancel != nil {
  124. cancel()
  125. }
  126. return
  127. }
  128. switch s {
  129. case ActionSignalStart:
  130. if ctx != nil {
  131. conf.Log.Warnf("rule %s is already started", rs.RuleId)
  132. } else {
  133. ctx, cancel = context.WithCancel(context.Background())
  134. go rs.runTopo(ctx)
  135. }
  136. case ActionSignalStop:
  137. // Stop the running loop
  138. if cancel != nil {
  139. cancel()
  140. ctx = nil
  141. cancel = nil
  142. } else {
  143. conf.Log.Warnf("rule %s is already stopped", rs.RuleId)
  144. }
  145. }
  146. }
  147. }()
  148. }
  149. func (rs *RuleState) runTopo(ctx context.Context) {
  150. // Load the changeable states once
  151. rs.Lock()
  152. tp := rs.Topology
  153. option := rs.Rule.Options.Restart
  154. rs.Unlock()
  155. if tp == nil {
  156. conf.Log.Warnf("rule %s is not initialized or just stopped", rs.RuleId)
  157. return
  158. }
  159. err := infra.SafeRun(func() error {
  160. count := 0
  161. d := option.Delay
  162. var er error
  163. ticker := time.NewTicker(time.Duration(d) * time.Millisecond)
  164. defer ticker.Stop()
  165. for {
  166. select {
  167. case e := <-tp.Open():
  168. er = e
  169. if er != nil { // Only restart rule for errors
  170. tp.GetContext().SetError(er)
  171. conf.Log.Errorf("closing rule %s for error: %v", rs.RuleId, er)
  172. tp.Cancel()
  173. } else { // exit normally
  174. return nil
  175. }
  176. }
  177. if count < option.Attempts {
  178. if d > option.MaxDelay {
  179. d = option.MaxDelay
  180. }
  181. if option.JitterFactor > 0 {
  182. d = int(math.Round(float64(d) * ((rand.Float64()*2-1)*0.1 + 1)))
  183. conf.Log.Infof("Rule %s will restart with jitterred delay %d", rs.RuleId, d)
  184. } else {
  185. conf.Log.Infof("Rule %s will restart with delay %d", rs.RuleId, d)
  186. }
  187. // retry after delay
  188. select {
  189. case <-ticker.C:
  190. break
  191. case <-ctx.Done():
  192. conf.Log.Errorf("stop rule %s retry as cancelled", rs.RuleId)
  193. return nil
  194. }
  195. count++
  196. if option.Multiplier > 0 {
  197. d = option.Delay * int(math.Pow(option.Multiplier, float64(count)))
  198. }
  199. } else {
  200. return er
  201. }
  202. }
  203. })
  204. if err != nil { // Exit after retries
  205. rs.Lock()
  206. // The only change the state by error
  207. if rs.triggered != -1 {
  208. rs.triggered = 0
  209. if rs.Topology != nil {
  210. rs.topoGraph = rs.Topology.GetTopo()
  211. }
  212. rs.ActionCh <- ActionSignalStop
  213. }
  214. rs.Unlock()
  215. }
  216. }
  217. // The action functions are state machine.
  218. func (rs *RuleState) Start() error {
  219. rs.Lock()
  220. defer rs.Unlock()
  221. if rs.triggered == -1 {
  222. return fmt.Errorf("rule %s is already deleted", rs.RuleId)
  223. }
  224. if rs.Rule.IsScheduleRule() {
  225. return rs.startScheduleRule()
  226. }
  227. return rs.start()
  228. }
  229. // startScheduleRule will register the job in the backgroundCron to run.
  230. // Job will do following 2 things:
  231. // 1. start the rule in cron if else the job is already stopped
  232. // 2. after the rule started, start an extract goroutine to stop the rule after specific duration
  233. func (rs *RuleState) startScheduleRule() error {
  234. if rs.cronState.isInSchedule {
  235. return fmt.Errorf("rule %s is already in schedule", rs.RuleId)
  236. }
  237. d, err := time.ParseDuration(rs.Rule.Options.Duration)
  238. if err != nil {
  239. return err
  240. }
  241. var cronCtx context.Context
  242. cronCtx, rs.cronState.cancel = context.WithCancel(context.Background())
  243. now := conf.GetNow()
  244. isInRunningSchedule, remainedDuration, err := rs.isInRunningSchedule(now, d)
  245. if err != nil {
  246. return err
  247. }
  248. if isInRunningSchedule {
  249. if err := rs.runScheduleRule(); err != nil {
  250. return err
  251. }
  252. rs.stopAfterDuration(remainedDuration, cronCtx)
  253. }
  254. entryID, err := backgroundCron.AddFunc(rs.Rule.Options.Cron, func() {
  255. var started bool
  256. var err error
  257. if started, err = func() (bool, error) {
  258. switch backgroundCron.(type) {
  259. case *MockCron:
  260. // skip mutex if this is a unit test
  261. default:
  262. rs.Lock()
  263. defer rs.Unlock()
  264. }
  265. now := conf.GetNow()
  266. allowed, err := rs.isInAllowedTimeRange(now)
  267. if err != nil {
  268. return false, err
  269. }
  270. if !allowed {
  271. return false, nil
  272. }
  273. rs.cronState.cron = rs.Rule.Options.Cron
  274. rs.cronState.duration = rs.Rule.Options.Duration
  275. return true, rs.start()
  276. }(); err != nil {
  277. rs.Lock()
  278. rs.cronState.startFailedCnt++
  279. rs.Unlock()
  280. conf.Log.Errorf(err.Error())
  281. return
  282. }
  283. if started {
  284. rs.stopAfterDuration(d, cronCtx)
  285. }
  286. })
  287. if err != nil {
  288. return err
  289. }
  290. rs.cronState.isInSchedule = true
  291. rs.cronState.entryID = entryID
  292. return nil
  293. }
  294. func (rs *RuleState) runScheduleRule() error {
  295. rs.Lock()
  296. defer rs.Unlock()
  297. rs.cronState.cron = rs.Rule.Options.Cron
  298. rs.cronState.duration = rs.Rule.Options.Duration
  299. err := rs.start()
  300. if err != nil {
  301. return err
  302. }
  303. return nil
  304. }
  305. func (rs *RuleState) stopAfterDuration(d time.Duration, cronCtx context.Context) {
  306. after := time.After(d)
  307. go func(ctx context.Context) {
  308. select {
  309. case <-after:
  310. rs.Lock()
  311. defer rs.Unlock()
  312. if err := rs.stop(); err != nil {
  313. conf.Log.Errorf("close rule %s failed, err: %v", rs.RuleId, err)
  314. }
  315. return
  316. case <-cronCtx.Done():
  317. return
  318. }
  319. }(cronCtx)
  320. }
  321. func (rs *RuleState) start() error {
  322. if rs.triggered != 1 {
  323. // If the rule has been stopped due to error, the topology is not nil
  324. if rs.Topology != nil {
  325. rs.Topology.Cancel()
  326. }
  327. if tp, err := planner.Plan(rs.Rule); err != nil {
  328. return err
  329. } else {
  330. rs.Topology = tp
  331. }
  332. rs.triggered = 1
  333. }
  334. rs.ActionCh <- ActionSignalStart
  335. return nil
  336. }
  337. // Stop remove the Topology
  338. func (rs *RuleState) Stop() error {
  339. rs.Lock()
  340. defer rs.Unlock()
  341. rs.stopScheduleRule()
  342. return rs.stop()
  343. }
  344. func (rs *RuleState) stopScheduleRule() {
  345. if rs.Rule.IsScheduleRule() && rs.cronState.isInSchedule {
  346. rs.cronState.isInSchedule = false
  347. if rs.cronState.cancel != nil {
  348. rs.cronState.cancel()
  349. }
  350. rs.cronState.startFailedCnt = 0
  351. backgroundCron.Remove(rs.cronState.entryID)
  352. }
  353. }
  354. func (rs *RuleState) stop() error {
  355. if rs.triggered == -1 {
  356. return fmt.Errorf("rule %s is already deleted", rs.RuleId)
  357. }
  358. rs.triggered = 0
  359. if rs.Topology != nil {
  360. rs.Topology.Cancel()
  361. }
  362. rs.ActionCh <- ActionSignalStop
  363. return nil
  364. }
  365. func (rs *RuleState) Close() error {
  366. rs.Lock()
  367. defer rs.Unlock()
  368. if rs.Topology != nil {
  369. rs.Topology.RemoveMetrics()
  370. }
  371. if rs.triggered == 1 && rs.Topology != nil {
  372. rs.Topology.Cancel()
  373. }
  374. rs.triggered = -1
  375. rs.stopScheduleRule()
  376. close(rs.ActionCh)
  377. return nil
  378. }
  379. func (rs *RuleState) GetState() (string, error) {
  380. rs.RLock()
  381. defer rs.RUnlock()
  382. result := ""
  383. if rs.Topology == nil {
  384. result = "Stopped: fail to create the topo."
  385. } else {
  386. c := (*rs.Topology).GetContext()
  387. if c != nil {
  388. err := c.Err()
  389. switch err {
  390. case nil:
  391. result = "Running"
  392. case context.Canceled:
  393. if rs.Rule.IsScheduleRule() && rs.cronState.isInSchedule {
  394. if isAfterTimeRanges(conf.GetNow(), rs.Rule.Options.CronDatetimeRange) {
  395. result = "Stopped: schedule terminated."
  396. } else {
  397. result = "Stopped: waiting for next schedule."
  398. }
  399. } else {
  400. result = "Stopped: canceled manually."
  401. }
  402. case context.DeadlineExceeded:
  403. result = "Stopped: deadline exceed."
  404. default:
  405. result = fmt.Sprintf("Stopped: %v.", err)
  406. }
  407. } else {
  408. if rs.cronState.isInSchedule {
  409. if isAfterTimeRanges(conf.GetNow(), rs.Rule.Options.CronDatetimeRange) {
  410. result = "Stopped: schedule terminated."
  411. } else {
  412. result = "Stopped: waiting for next schedule."
  413. }
  414. } else {
  415. result = "Stopped: canceled manually."
  416. }
  417. }
  418. }
  419. if rs.Rule.IsScheduleRule() && rs.cronState.startFailedCnt > 0 {
  420. result = result + fmt.Sprintf(" Start failed count: %v.", rs.cronState.startFailedCnt)
  421. }
  422. return result, nil
  423. }
  424. func (rs *RuleState) GetTopoGraph() *api.PrintableTopo {
  425. rs.RLock()
  426. defer rs.RUnlock()
  427. if rs.topoGraph != nil {
  428. return rs.topoGraph
  429. } else if rs.Topology != nil {
  430. return rs.Topology.GetTopo()
  431. } else {
  432. return nil
  433. }
  434. }
  435. const layout = "2006-01-02 15:04:05"
  436. func isInScheduleRange(now time.Time, start string, end string) (bool, error) {
  437. s, err := time.Parse(layout, start)
  438. if err != nil {
  439. return false, err
  440. }
  441. e, err := time.Parse(layout, end)
  442. if err != nil {
  443. return false, err
  444. }
  445. isBefore := s.Before(now)
  446. isAfter := e.After(now)
  447. if isBefore && isAfter {
  448. return true, nil
  449. }
  450. return false, nil
  451. }
  452. func isAfterTimeRanges(now time.Time, ranges []api.DatetimeRange) bool {
  453. if len(ranges) < 1 {
  454. return false
  455. }
  456. for _, r := range ranges {
  457. isAfter, err := isAfterTimeRange(now, r.End)
  458. if err != nil || !isAfter {
  459. return false
  460. }
  461. }
  462. return true
  463. }
  464. func isAfterTimeRange(now time.Time, end string) (bool, error) {
  465. e, err := time.Parse(layout, end)
  466. if err != nil {
  467. return false, err
  468. }
  469. return now.After(e), nil
  470. }
  471. func (rs *RuleState) isInRunningSchedule(now time.Time, d time.Duration) (bool, time.Duration, error) {
  472. allowed, err := rs.isInAllowedTimeRange(now)
  473. if err != nil {
  474. return false, 0, err
  475. }
  476. if !allowed {
  477. return false, 0, nil
  478. }
  479. cronExpr := rs.Rule.Options.Cron
  480. if strings.HasPrefix(cronExpr, "mock") {
  481. return false, 0, nil
  482. }
  483. return isInRunningSchedule(cronExpr, now, d)
  484. }
  485. func (rs *RuleState) isInAllowedTimeRange(now time.Time) (bool, error) {
  486. allowed := true
  487. var err error
  488. for _, timeRange := range rs.Rule.Options.CronDatetimeRange {
  489. allowed, err = isInScheduleRange(now, timeRange.Begin, timeRange.End)
  490. if err != nil {
  491. return false, err
  492. }
  493. if allowed {
  494. break
  495. }
  496. }
  497. return allowed, nil
  498. }
  499. // isInRunningSchedule checks whether the rule should be running, eg:
  500. // If the duration is 10min, and cron is "0 0 * * *", and the current time is 00:00:02
  501. // And the rule should be started immediately instead of checking it on the next day.
  502. func isInRunningSchedule(cronExpr string, now time.Time, d time.Duration) (bool, time.Duration, error) {
  503. s, err := cron.ParseStandard(cronExpr)
  504. if err != nil {
  505. return false, 0, err
  506. }
  507. previousSchedule := s.Next(now.Add(-d))
  508. if now.After(previousSchedule) && now.Before(previousSchedule.Add(d)) {
  509. return true, previousSchedule.Add(d).Sub(now), nil
  510. }
  511. return false, 0, nil
  512. }