coordinator.go 6.8 KB


  1. package checkpoints
  2. import (
  3. "github.com/benbjohnson/clock"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "time"
  8. )
  9. type pendingCheckpoint struct {
  10. checkpointId int64
  11. isDiscarded bool
  12. notYetAckTasks map[string]bool
  13. }
  14. func newPendingCheckpoint(checkpointId int64, tasksToWaitFor []Responder) *pendingCheckpoint {
  15. pc := &pendingCheckpoint{checkpointId: checkpointId}
  16. nyat := make(map[string]bool)
  17. for _, r := range tasksToWaitFor {
  18. nyat[r.GetName()] = true
  19. }
  20. pc.notYetAckTasks = nyat
  21. return pc
  22. }
  23. func (c *pendingCheckpoint) ack(opId string) bool {
  24. if c.isDiscarded {
  25. return false
  26. }
  27. delete(c.notYetAckTasks, opId)
  28. //TODO serialize state
  29. return true
  30. }
  31. func (c *pendingCheckpoint) isFullyAck() bool {
  32. return len(c.notYetAckTasks) == 0
  33. }
  34. func (c *pendingCheckpoint) finalize() *completedCheckpoint {
  35. ccp := &completedCheckpoint{checkpointId: c.checkpointId}
  36. return ccp
  37. }
  38. func (c *pendingCheckpoint) dispose(releaseState bool) {
  39. c.isDiscarded = true
  40. }
  41. type completedCheckpoint struct {
  42. checkpointId int64
  43. }
  44. type checkpointStore struct {
  45. maxNum int
  46. checkpoints []*completedCheckpoint
  47. }
  48. func (s *checkpointStore) add(c *completedCheckpoint) {
  49. s.checkpoints = append(s.checkpoints, c)
  50. if len(s.checkpoints) > s.maxNum {
  51. s.checkpoints = s.checkpoints[1:]
  52. }
  53. }
  54. func (s *checkpointStore) getLatest() *completedCheckpoint {
  55. if len(s.checkpoints) > 0 {
  56. return s.checkpoints[len(s.checkpoints)-1]
  57. }
  58. return nil
  59. }
  60. type Coordinator struct {
  61. tasksToTrigger []Responder
  62. tasksToWaitFor []Responder
  63. pendingCheckpoints map[int64]*pendingCheckpoint
  64. completedCheckpoints *checkpointStore
  65. ruleId string
  66. baseInterval int
  67. timeout int
  68. advanceToEndOfEventTime bool
  69. ticker *clock.Ticker //For processing time only
  70. signal chan *Signal
  71. store Store
  72. ctx api.StreamContext
  73. }
  74. func NewCoordinator(ruleId string, sources []StreamTask, operators []NonSourceTask, sinks []NonSourceTask, qos xsql.Qos, store Store, interval int, ctx api.StreamContext) *Coordinator {
  75. signal := make(chan *Signal, 1024)
  76. var allResponders, sourceResponders []Responder
  77. for _, r := range sources {
  78. re := NewResponderExecutor(signal, r)
  79. allResponders = append(allResponders, re)
  80. sourceResponders = append(sourceResponders, re)
  81. }
  82. for _, r := range operators {
  83. re := NewResponderExecutor(signal, r)
  84. handler := createBarrierHandler(re, r.GetInputCount(), qos)
  85. r.InitCheckpoint(handler, qos)
  86. allResponders = append(allResponders, re)
  87. }
  88. for _, r := range sinks {
  89. re := NewResponderExecutor(signal, r)
  90. handler := NewBarrierTracker(re, r.GetInputCount())
  91. r.InitCheckpoint(handler, qos)
  92. allResponders = append(allResponders, re)
  93. }
  94. //5 minutes by default
  95. if interval <= 0 {
  96. interval = 5000
  97. }
  98. return &Coordinator{
  99. tasksToTrigger: sourceResponders,
  100. tasksToWaitFor: allResponders,
  101. pendingCheckpoints: make(map[int64]*pendingCheckpoint),
  102. completedCheckpoints: &checkpointStore{
  103. maxNum: 3,
  104. },
  105. ruleId: ruleId,
  106. signal: signal,
  107. baseInterval: interval,
  108. timeout: 200000,
  109. store: store,
  110. ctx: ctx,
  111. }
  112. }
  113. func createBarrierHandler(re Responder, inputCount int, qos xsql.Qos) BarrierHandler {
  114. if qos == xsql.AtLeastOnce {
  115. return NewBarrierTracker(re, inputCount)
  116. } else if qos == xsql.ExactlyOnce {
  117. return NewBarrierAligner(re, inputCount)
  118. } else {
  119. return nil
  120. }
  121. }
  122. func (c *Coordinator) Activate() error {
  123. logger := c.ctx.GetLogger()
  124. if c.ticker != nil {
  125. c.ticker.Stop()
  126. }
  127. c.ticker = common.GetTicker(c.baseInterval)
  128. tc := c.ticker.C
  129. go func() {
  130. for {
  131. select {
  132. case <-tc:
  133. //trigger checkpoint
  134. //TODO pose max attempt and min pause check for consequent pendingCheckpoints
  135. // TODO Check if all tasks are running
  136. //Create a pending checkpoint
  137. checkpointId := common.GetNowInMilli()
  138. checkpoint := newPendingCheckpoint(checkpointId, c.tasksToWaitFor)
  139. logger.Debugf("Create checkpoint %d", checkpointId)
  140. c.pendingCheckpoints[checkpointId] = checkpoint
  141. //Let the sources send out a barrier
  142. for _, r := range c.tasksToTrigger {
  143. go func() {
  144. if err := r.TriggerCheckpoint(checkpointId); err != nil {
  145. logger.Infof("Fail to trigger checkpoint for source %s with error %v", r.GetName(), err)
  146. c.cancel(checkpointId)
  147. } else {
  148. time.Sleep(time.Duration(c.timeout) * time.Microsecond)
  149. c.cancel(checkpointId)
  150. }
  151. }()
  152. }
  153. case s := <-c.signal:
  154. switch s.Message {
  155. case STOP:
  156. logger.Debug("Stop checkpoint scheduler")
  157. if c.ticker != nil {
  158. c.ticker.Stop()
  159. }
  160. return
  161. case ACK:
  162. logger.Debugf("Receive ack from %s for checkpoint %d", s.OpId, s.CheckpointId)
  163. if checkpoint, ok := c.pendingCheckpoints[s.CheckpointId]; ok {
  164. checkpoint.ack(s.OpId)
  165. if checkpoint.isFullyAck() {
  166. c.complete(s.CheckpointId)
  167. }
  168. } else {
  169. logger.Debugf("Receive ack from %s for non existing checkpoint %d", s.OpId, s.CheckpointId)
  170. }
  171. case DEC:
  172. logger.Debugf("Receive dec from %s for checkpoint %d", s.OpId, s.CheckpointId)
  173. c.cancel(s.CheckpointId)
  174. }
  175. case <-c.ctx.Done():
  176. logger.Infoln("Cancelling coordinator....")
  177. if c.ticker != nil {
  178. c.ticker.Stop()
  179. }
  180. return
  181. }
  182. }
  183. }()
  184. return nil
  185. }
  186. func (c *Coordinator) Deactivate() error {
  187. if c.ticker != nil {
  188. c.ticker.Stop()
  189. }
  190. c.signal <- &Signal{Message: STOP}
  191. return nil
  192. }
  193. func (c *Coordinator) cancel(checkpointId int64) {
  194. logger := c.ctx.GetLogger()
  195. if checkpoint, ok := c.pendingCheckpoints[checkpointId]; ok {
  196. delete(c.pendingCheckpoints, checkpointId)
  197. checkpoint.dispose(true)
  198. } else {
  199. logger.Debugf("Cancel for non existing checkpoint %d. Just ignored", checkpointId)
  200. }
  201. }
  202. func (c *Coordinator) complete(checkpointId int64) {
  203. logger := c.ctx.GetLogger()
  204. if ccp, ok := c.pendingCheckpoints[checkpointId]; ok {
  205. err := c.store.SaveCheckpoint(checkpointId)
  206. if err != nil {
  207. logger.Infof("Cannot save checkpoint %d due to storage error: %v", checkpointId, err)
  208. //TODO handle checkpoint error
  209. return
  210. }
  211. c.completedCheckpoints.add(ccp.finalize())
  212. delete(c.pendingCheckpoints, checkpointId)
  213. //Drop the previous pendingCheckpoints
  214. for cid, cp := range c.pendingCheckpoints {
  215. if cid < checkpointId {
  216. //TODO revisit how to abort a checkpoint, discard callback
  217. cp.isDiscarded = true
  218. delete(c.pendingCheckpoints, cid)
  219. }
  220. }
  221. logger.Debugf("Totally complete checkpoint %d", checkpointId)
  222. } else {
  223. logger.Infof("Cannot find checkpoint %d to complete", checkpointId)
  224. }
  225. }
  226. //For testing
  227. func (c *Coordinator) GetCompleteCount() int {
  228. return len(c.completedCheckpoints.checkpoints)
  229. }
  230. func (c *Coordinator) GetLatest() int64 {
  231. return c.completedCheckpoints.getLatest().checkpointId
  232. }