responder.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package checkpoints
  2. import "fmt"
  3. type Responder interface {
  4. TriggerCheckpoint(checkpointId int64) error
  5. GetName() string
  6. }
  7. type ResponderExecutor struct {
  8. responder chan<- *Signal
  9. task StreamTask
  10. }
  11. func NewResponderExecutor(responder chan<- *Signal, task StreamTask) *ResponderExecutor {
  12. return &ResponderExecutor{
  13. responder: responder,
  14. task: task,
  15. }
  16. }
  17. func (re *ResponderExecutor) GetName() string {
  18. return re.task.GetName()
  19. }
  20. func (re *ResponderExecutor) TriggerCheckpoint(checkpointId int64) error {
  21. ctx := re.task.GetStreamContext()
  22. logger := ctx.GetLogger()
  23. sctx, ok := ctx.(StreamCheckpointContext)
  24. if !ok {
  25. return fmt.Errorf("invalid context for checkpoint responder, must be a StreamCheckpointContext")
  26. }
  27. name := re.GetName()
  28. logger.Debugf("Starting checkpoint %d on task %s", checkpointId, name)
  29. //create
  30. barrier := &Barrier{
  31. CheckpointId: checkpointId,
  32. OpId: name,
  33. }
  34. //broadcast barrier
  35. re.task.Broadcast(barrier)
  36. //Save key state to the global state
  37. err := sctx.Snapshot()
  38. if err != nil {
  39. return err
  40. }
  41. go func() {
  42. state := ACK
  43. err := sctx.SaveState(checkpointId)
  44. if err != nil {
  45. logger.Infof("save checkpoint error %s", err)
  46. state = DEC
  47. }
  48. signal := &Signal{
  49. Message: state,
  50. Barrier: Barrier{CheckpointId: checkpointId, OpId: name},
  51. }
  52. re.responder <- signal
  53. logger.Debugf("Complete checkpoint %d on task %s", checkpointId, name)
  54. }()
  55. return nil
  56. }