responder.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. package checkpoints
  2. type Responder interface {
  3. TriggerCheckpoint(checkpointId int64) error
  4. GetName() string
  5. }
  6. type ResponderExecutor struct {
  7. responder chan<- *Signal
  8. task StreamTask
  9. }
  10. func NewResponderExecutor(responder chan<- *Signal, task StreamTask) *ResponderExecutor {
  11. return &ResponderExecutor{
  12. responder: responder,
  13. task: task,
  14. }
  15. }
  16. func (re *ResponderExecutor) GetName() string {
  17. return re.task.GetName()
  18. }
  19. func (re *ResponderExecutor) TriggerCheckpoint(checkpointId int64) error {
  20. ctx := re.task.GetStreamContext()
  21. logger := ctx.GetLogger()
  22. name := re.GetName()
  23. logger.Debugf("Starting checkpoint %d on task %s", checkpointId, name)
  24. //create
  25. barrier := &Barrier{
  26. CheckpointId: checkpointId,
  27. OpId: name,
  28. }
  29. //broadcast barrier
  30. re.task.Broadcast(barrier)
  31. //Save key state to the global state
  32. ctx.Snapshot()
  33. go func() {
  34. state := ACK
  35. err := ctx.SaveState(checkpointId)
  36. if err != nil {
  37. logger.Infof("save checkpoint error %s", err)
  38. state = DEC
  39. }
  40. signal := &Signal{
  41. Message: state,
  42. Barrier: Barrier{CheckpointId: checkpointId, OpId: name},
  43. }
  44. re.responder <- signal
  45. logger.Debugf("Complete checkpoint %d on task %s", checkpointId, name)
  46. }()
  47. return nil
  48. }