responder.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. package checkpoints
  2. type Responder interface {
  3. TriggerCheckpoint(checkpointId int64) error
  4. GetName() string
  5. }
  6. type ResponderExecutor struct {
  7. responder chan<- *Signal
  8. task StreamTask
  9. }
  10. func NewResponderExecutor(responder chan<- *Signal, task StreamTask) *ResponderExecutor {
  11. return &ResponderExecutor{
  12. responder: responder,
  13. task: task,
  14. }
  15. }
  16. func (re *ResponderExecutor) GetName() string {
  17. return re.task.GetName()
  18. }
  19. func (re *ResponderExecutor) TriggerCheckpoint(checkpointId int64) error {
  20. ctx := re.task.GetStreamContext()
  21. logger := ctx.GetLogger()
  22. name := re.GetName()
  23. logger.Debugf("Starting checkpoint %d on task %s", checkpointId, name)
  24. //create
  25. barrier := &Barrier{
  26. CheckpointId: checkpointId,
  27. OpId: name,
  28. }
  29. //broadcast barrier
  30. re.task.Broadcast(barrier)
  31. //Save key state to the global state
  32. go func() {
  33. state := ACK
  34. err := ctx.SaveState(checkpointId)
  35. if err != nil {
  36. logger.Infof("save checkpoint error %s", err)
  37. state = DEC
  38. }
  39. signal := &Signal{
  40. Message: state,
  41. Barrier: Barrier{CheckpointId: checkpointId, OpId: name},
  42. }
  43. re.responder <- signal
  44. logger.Debugf("Complete checkpoint %d on task %s", checkpointId, name)
  45. }()
  46. return nil
  47. }