responder.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package checkpoint
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/pkg/infra"
  18. )
  19. type Responder interface {
  20. TriggerCheckpoint(checkpointId int64) error
  21. GetName() string
  22. }
  23. type ResponderExecutor struct {
  24. responder chan<- *Signal
  25. task StreamTask
  26. }
  27. func NewResponderExecutor(responder chan<- *Signal, task StreamTask) *ResponderExecutor {
  28. return &ResponderExecutor{
  29. responder: responder,
  30. task: task,
  31. }
  32. }
  33. func (re *ResponderExecutor) GetName() string {
  34. return re.task.GetName()
  35. }
  36. func (re *ResponderExecutor) TriggerCheckpoint(checkpointId int64) error {
  37. ctx := re.task.GetStreamContext()
  38. logger := ctx.GetLogger()
  39. sctx, ok := ctx.(StreamCheckpointContext)
  40. if !ok {
  41. return fmt.Errorf("invalid context for checkpoint responder, must be a StreamCheckpointContext")
  42. }
  43. name := re.GetName()
  44. logger.Debugf("Starting checkpoint %d on task %s", checkpointId, name)
  45. // create
  46. barrier := &Barrier{
  47. CheckpointId: checkpointId,
  48. OpId: name,
  49. }
  50. // broadcast barrier
  51. re.task.Broadcast(barrier)
  52. // Save key state to the global state
  53. err := sctx.Snapshot()
  54. if err != nil {
  55. return err
  56. }
  57. go infra.SafeRun(func() error {
  58. state := ACK
  59. err := sctx.SaveState(checkpointId)
  60. if err != nil {
  61. logger.Infof("save checkpoint error %s", err)
  62. state = DEC
  63. }
  64. signal := &Signal{
  65. Message: state,
  66. Barrier: Barrier{CheckpointId: checkpointId, OpId: name},
  67. }
  68. re.responder <- signal
  69. logger.Debugf("Complete checkpoint %d on task %s", checkpointId, name)
  70. return nil
  71. })
  72. return nil
  73. }