responder.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package checkpoint
  15. import "fmt"
  16. type Responder interface {
  17. TriggerCheckpoint(checkpointId int64) error
  18. GetName() string
  19. }
  20. type ResponderExecutor struct {
  21. responder chan<- *Signal
  22. task StreamTask
  23. }
  24. func NewResponderExecutor(responder chan<- *Signal, task StreamTask) *ResponderExecutor {
  25. return &ResponderExecutor{
  26. responder: responder,
  27. task: task,
  28. }
  29. }
  30. func (re *ResponderExecutor) GetName() string {
  31. return re.task.GetName()
  32. }
  33. func (re *ResponderExecutor) TriggerCheckpoint(checkpointId int64) error {
  34. ctx := re.task.GetStreamContext()
  35. logger := ctx.GetLogger()
  36. sctx, ok := ctx.(StreamCheckpointContext)
  37. if !ok {
  38. return fmt.Errorf("invalid context for checkpoint responder, must be a StreamCheckpointContext")
  39. }
  40. name := re.GetName()
  41. logger.Debugf("Starting checkpoint %d on task %s", checkpointId, name)
  42. //create
  43. barrier := &Barrier{
  44. CheckpointId: checkpointId,
  45. OpId: name,
  46. }
  47. //broadcast barrier
  48. re.task.Broadcast(barrier)
  49. //Save key state to the global state
  50. err := sctx.Snapshot()
  51. if err != nil {
  52. return err
  53. }
  54. go func() {
  55. state := ACK
  56. err := sctx.SaveState(checkpointId)
  57. if err != nil {
  58. logger.Infof("save checkpoint error %s", err)
  59. state = DEC
  60. }
  61. signal := &Signal{
  62. Message: state,
  63. Barrier: Barrier{CheckpointId: checkpointId, OpId: name},
  64. }
  65. re.responder <- signal
  66. logger.Debugf("Complete checkpoint %d on task %s", checkpointId, name)
  67. }()
  68. return nil
  69. }