coordinator.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package checkpoint
  15. import (
  16. "sync"
  17. "github.com/benbjohnson/clock"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/pkg/api"
  20. "github.com/lf-edge/ekuiper/pkg/cast"
  21. "github.com/lf-edge/ekuiper/pkg/infra"
  22. )
  23. type pendingCheckpoint struct {
  24. checkpointId int64
  25. isDiscarded bool
  26. notYetAckTasks map[string]bool
  27. }
  28. func newPendingCheckpoint(checkpointId int64, tasksToWaitFor []Responder) *pendingCheckpoint {
  29. pc := &pendingCheckpoint{checkpointId: checkpointId}
  30. nyat := make(map[string]bool)
  31. for _, r := range tasksToWaitFor {
  32. nyat[r.GetName()] = true
  33. }
  34. pc.notYetAckTasks = nyat
  35. return pc
  36. }
  37. func (c *pendingCheckpoint) ack(opId string) bool {
  38. if c.isDiscarded {
  39. return false
  40. }
  41. delete(c.notYetAckTasks, opId)
  42. // TODO serialize state
  43. return true
  44. }
  45. func (c *pendingCheckpoint) isFullyAck() bool {
  46. return len(c.notYetAckTasks) == 0
  47. }
  48. func (c *pendingCheckpoint) finalize() *completedCheckpoint {
  49. ccp := &completedCheckpoint{checkpointId: c.checkpointId}
  50. return ccp
  51. }
  52. func (c *pendingCheckpoint) dispose(_ bool) {
  53. c.isDiscarded = true
  54. }
  55. type completedCheckpoint struct {
  56. checkpointId int64
  57. }
  58. type checkpointStore struct {
  59. maxNum int
  60. checkpoints []*completedCheckpoint
  61. }
  62. func (s *checkpointStore) add(c *completedCheckpoint) {
  63. s.checkpoints = append(s.checkpoints, c)
  64. if len(s.checkpoints) > s.maxNum {
  65. s.checkpoints = s.checkpoints[1:]
  66. }
  67. }
  68. func (s *checkpointStore) getLatest() *completedCheckpoint {
  69. if len(s.checkpoints) > 0 {
  70. return s.checkpoints[len(s.checkpoints)-1]
  71. }
  72. return nil
  73. }
  74. type Coordinator struct {
  75. tasksToTrigger []Responder
  76. tasksToWaitFor []Responder
  77. sinkTasks []SinkTask
  78. pendingCheckpoints *sync.Map
  79. completedCheckpoints *checkpointStore
  80. ruleId string
  81. baseInterval int
  82. cleanThreshold int
  83. advanceToEndOfEventTime bool
  84. ticker *clock.Ticker // For processing time only
  85. signal chan *Signal
  86. store api.Store
  87. ctx api.StreamContext
  88. activated bool
  89. }
  90. func NewCoordinator(ruleId string, sources []StreamTask, operators []NonSourceTask, sinks []SinkTask, qos api.Qos, store api.Store, interval int, ctx api.StreamContext) *Coordinator {
  91. logger := ctx.GetLogger()
  92. logger.Infof("create new coordinator for rule %s", ruleId)
  93. signal := make(chan *Signal, 1024)
  94. var allResponders, sourceResponders []Responder
  95. for _, r := range sources {
  96. r.SetQos(qos)
  97. re := NewResponderExecutor(signal, r)
  98. allResponders = append(allResponders, re)
  99. sourceResponders = append(sourceResponders, re)
  100. }
  101. for _, r := range operators {
  102. r.SetQos(qos)
  103. re := NewResponderExecutor(signal, r)
  104. handler := createBarrierHandler(re, r.GetInputCount(), qos)
  105. r.SetBarrierHandler(handler)
  106. allResponders = append(allResponders, re)
  107. }
  108. for _, r := range sinks {
  109. r.SetQos(qos)
  110. re := NewResponderExecutor(signal, r)
  111. handler := NewBarrierTracker(re, r.GetInputCount())
  112. r.SetBarrierHandler(handler)
  113. allResponders = append(allResponders, re)
  114. }
  115. // 5 minutes by default
  116. if interval <= 0 {
  117. interval = 300000
  118. }
  119. return &Coordinator{
  120. tasksToTrigger: sourceResponders,
  121. tasksToWaitFor: allResponders,
  122. sinkTasks: sinks,
  123. pendingCheckpoints: new(sync.Map),
  124. completedCheckpoints: &checkpointStore{
  125. maxNum: 3,
  126. },
  127. ruleId: ruleId,
  128. signal: signal,
  129. baseInterval: interval,
  130. store: store,
  131. ctx: ctx,
  132. cleanThreshold: 100,
  133. }
  134. }
  135. func createBarrierHandler(re Responder, inputCount int, qos api.Qos) BarrierHandler {
  136. if qos == api.AtLeastOnce {
  137. return NewBarrierTracker(re, inputCount)
  138. } else if qos == api.ExactlyOnce {
  139. return NewBarrierAligner(re, inputCount)
  140. } else {
  141. return nil
  142. }
  143. }
  144. func (c *Coordinator) Activate() error {
  145. logger := c.ctx.GetLogger()
  146. logger.Infof("Start checkpoint coordinator for rule %s at %d", c.ruleId, conf.GetNowInMilli())
  147. if c.ticker != nil {
  148. c.ticker.Stop()
  149. }
  150. c.ticker = conf.GetTicker(int64(c.baseInterval))
  151. tc := c.ticker.C
  152. go func() {
  153. err := infra.SafeRun(func() error {
  154. c.activated = true
  155. toBeClean := 0
  156. for {
  157. select {
  158. case n := <-tc:
  159. // trigger checkpoint
  160. // TODO pose max attempt and min pause check for consequent pendingCheckpoints
  161. // TODO Check if all tasks are running
  162. // Create a pending checkpoint
  163. checkpointId := cast.TimeToUnixMilli(n)
  164. checkpoint := newPendingCheckpoint(checkpointId, c.tasksToWaitFor)
  165. logger.Debugf("Create checkpoint %d", checkpointId)
  166. c.pendingCheckpoints.Store(checkpointId, checkpoint)
  167. // Let the sources send out a barrier
  168. for _, r := range c.tasksToTrigger {
  169. go func(t Responder) {
  170. if err := t.TriggerCheckpoint(checkpointId); err != nil {
  171. logger.Infof("Fail to trigger checkpoint for source %s with error %v, cancel it", t.GetName(), err)
  172. c.cancel(checkpointId)
  173. }
  174. }(r)
  175. }
  176. toBeClean++
  177. if toBeClean >= c.cleanThreshold {
  178. c.store.Clean()
  179. toBeClean = 0
  180. }
  181. case s := <-c.signal:
  182. switch s.Message {
  183. case STOP:
  184. logger.Debug("Stop checkpoint scheduler")
  185. if c.ticker != nil {
  186. c.ticker.Stop()
  187. }
  188. return nil
  189. case ACK:
  190. logger.Debugf("Receive ack from %s for checkpoint %d", s.OpId, s.CheckpointId)
  191. if cp, ok := c.pendingCheckpoints.Load(s.CheckpointId); ok {
  192. checkpoint := cp.(*pendingCheckpoint)
  193. checkpoint.ack(s.OpId)
  194. if checkpoint.isFullyAck() {
  195. c.complete(s.CheckpointId)
  196. }
  197. } else {
  198. logger.Debugf("Receive ack from %s for non existing checkpoint %d", s.OpId, s.CheckpointId)
  199. }
  200. case DEC:
  201. logger.Debugf("Receive dec from %s for checkpoint %d, cancel it", s.OpId, s.CheckpointId)
  202. c.cancel(s.CheckpointId)
  203. }
  204. case <-c.ctx.Done():
  205. logger.Infoln("Cancelling coordinator....")
  206. if c.ticker != nil {
  207. c.ticker.Stop()
  208. logger.Infoln("Stop coordinator ticker")
  209. }
  210. return nil
  211. }
  212. }
  213. })
  214. logger.Error(err)
  215. }()
  216. return nil
  217. }
  218. func (c *Coordinator) Deactivate() error {
  219. if c.ticker != nil {
  220. c.ticker.Stop()
  221. }
  222. c.signal <- &Signal{Message: STOP}
  223. return nil
  224. }
  225. func (c *Coordinator) cancel(checkpointId int64) {
  226. logger := c.ctx.GetLogger()
  227. if checkpoint, ok := c.pendingCheckpoints.Load(checkpointId); ok {
  228. c.pendingCheckpoints.Delete(checkpointId)
  229. checkpoint.(*pendingCheckpoint).dispose(true)
  230. } else {
  231. logger.Debugf("Cancel for non existing checkpoint %d. Just ignored", checkpointId)
  232. }
  233. }
  234. func (c *Coordinator) complete(checkpointId int64) {
  235. logger := c.ctx.GetLogger()
  236. if ccp, ok := c.pendingCheckpoints.Load(checkpointId); ok {
  237. err := c.store.SaveCheckpoint(checkpointId)
  238. if err != nil {
  239. logger.Infof("Cannot save checkpoint %d due to storage error: %v", checkpointId, err)
  240. // TODO handle checkpoint error
  241. return
  242. }
  243. c.completedCheckpoints.add(ccp.(*pendingCheckpoint).finalize())
  244. c.pendingCheckpoints.Delete(checkpointId)
  245. // Drop the previous pendingCheckpoints
  246. c.pendingCheckpoints.Range(func(a1 interface{}, a2 interface{}) bool {
  247. cid := a1.(int64)
  248. cp := a2.(*pendingCheckpoint)
  249. if cid < checkpointId {
  250. // TODO revisit how to abort a checkpoint, discard callback
  251. cp.isDiscarded = true
  252. c.pendingCheckpoints.Delete(cid)
  253. }
  254. return true
  255. })
  256. logger.Debugf("Totally complete checkpoint %d", checkpointId)
  257. } else {
  258. logger.Infof("Cannot find checkpoint %d to complete", checkpointId)
  259. }
  260. }
  261. // For testing
  262. func (c *Coordinator) GetCompleteCount() int {
  263. return len(c.completedCheckpoints.checkpoints)
  264. }
  265. func (c *Coordinator) GetLatest() int64 {
  266. return c.completedCheckpoints.getLatest().checkpointId
  267. }
  268. func (c *Coordinator) IsActivated() bool {
  269. return c.activated
  270. }