12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- // Copyright 2021-2022 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package checkpoint
- import (
- "fmt"
- "github.com/lf-edge/ekuiper/pkg/infra"
- )
- type Responder interface {
- TriggerCheckpoint(checkpointId int64) error
- GetName() string
- }
- type ResponderExecutor struct {
- responder chan<- *Signal
- task StreamTask
- }
- func NewResponderExecutor(responder chan<- *Signal, task StreamTask) *ResponderExecutor {
- return &ResponderExecutor{
- responder: responder,
- task: task,
- }
- }
- func (re *ResponderExecutor) GetName() string {
- return re.task.GetName()
- }
- func (re *ResponderExecutor) TriggerCheckpoint(checkpointId int64) error {
- ctx := re.task.GetStreamContext()
- logger := ctx.GetLogger()
- sctx, ok := ctx.(StreamCheckpointContext)
- if !ok {
- return fmt.Errorf("invalid context for checkpoint responder, must be a StreamCheckpointContext")
- }
- name := re.GetName()
- logger.Debugf("Starting checkpoint %d on task %s", checkpointId, name)
- // create
- barrier := &Barrier{
- CheckpointId: checkpointId,
- OpId: name,
- }
- // broadcast barrier
- re.task.Broadcast(barrier)
- // Save key state to the global state
- err := sctx.Snapshot()
- if err != nil {
- return err
- }
- go infra.SafeRun(func() error {
- state := ACK
- err := sctx.SaveState(checkpointId)
- if err != nil {
- logger.Infof("save checkpoint error %s", err)
- state = DEC
- }
- signal := &Signal{
- Message: state,
- Barrier: Barrier{CheckpointId: checkpointId, OpId: name},
- }
- re.responder <- signal
- logger.Debugf("Complete checkpoint %d on task %s", checkpointId, name)
- return nil
- })
- return nil
- }
|