watermark_op.go 7.0 KB


  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "math"
  18. "sort"
  19. "github.com/lf-edge/ekuiper/internal/topo/node/metric"
  20. "github.com/lf-edge/ekuiper/internal/xsql"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/infra"
  23. )
  24. // WatermarkOp is used when event time is enabled.
  25. // It is used to align the event time of the input streams
  26. // It sends out the data in time order with watermark.
  27. type WatermarkOp struct {
  28. *defaultSinkNode
  29. statManager metric.StatManager
  30. // config
  31. lateTolerance int64
  32. sendWatermark bool
  33. // state
  34. events []*xsql.Tuple // All the cached events in order
  35. streamWMs map[string]int64
  36. lastWatermarkTs int64
  37. }
  38. var _ OperatorNode = &WatermarkOp{}
  39. const (
  40. WatermarkKey = "$$wartermark"
  41. EventInputKey = "$$eventinputs"
  42. StreamWMKey = "$$streamwms"
  43. )
  44. func NewWatermarkOp(name string, sendWatermark bool, streams []string, options *api.RuleOption) *WatermarkOp {
  45. wms := make(map[string]int64, len(streams))
  46. for _, s := range streams {
  47. wms[s] = options.LateTol
  48. }
  49. return &WatermarkOp{
  50. defaultSinkNode: &defaultSinkNode{
  51. input: make(chan interface{}, options.BufferLength),
  52. defaultNode: &defaultNode{
  53. outputs: make(map[string]chan<- interface{}),
  54. name: name,
  55. sendError: options.SendError,
  56. },
  57. },
  58. lateTolerance: options.LateTol,
  59. sendWatermark: sendWatermark,
  60. streamWMs: wms,
  61. }
  62. }
  63. func (w *WatermarkOp) Exec(ctx api.StreamContext, errCh chan<- error) {
  64. ctx.GetLogger().Debugf("watermark node %s is started", w.name)
  65. if len(w.outputs) <= 0 {
  66. infra.DrainError(ctx, fmt.Errorf("no output channel found"), errCh)
  67. return
  68. }
  69. stats, err := metric.NewStatManager(ctx, "op")
  70. if err != nil {
  71. infra.DrainError(ctx, fmt.Errorf("fail to create stat manager"), errCh)
  72. return
  73. }
  74. w.statManager = stats
  75. w.statManagers = []metric.StatManager{stats}
  76. w.ctx = ctx
  77. // restore state
  78. if s, err := ctx.GetState(WatermarkKey); err == nil && s != nil {
  79. if si, ok := s.(int64); ok {
  80. w.lastWatermarkTs = si
  81. } else {
  82. infra.DrainError(ctx, fmt.Errorf("restore watermark state `lastWatermarkTs` %v error, invalid type", s), errCh)
  83. return
  84. }
  85. }
  86. if s, err := ctx.GetState(EventInputKey); err == nil {
  87. switch st := s.(type) {
  88. case []*xsql.Tuple:
  89. w.events = st
  90. ctx.GetLogger().Infof("Restore watermark events state %+v", st)
  91. case nil:
  92. ctx.GetLogger().Debugf("Restore watermark events state, nothing")
  93. default:
  94. infra.DrainError(ctx, fmt.Errorf("restore watermark event state %v error, invalid type", st), errCh)
  95. return
  96. }
  97. } else {
  98. ctx.GetLogger().Warnf("Restore watermark event state fails: %s", err)
  99. }
  100. if s, err := ctx.GetState(StreamWMKey); err == nil && s != nil {
  101. if si, ok := s.(map[string]int64); ok {
  102. w.streamWMs = si
  103. } else {
  104. infra.DrainError(ctx, fmt.Errorf("restore watermark stream keys state %v error, invalid type", s), errCh)
  105. return
  106. }
  107. }
  108. ctx.GetLogger().Infof("Start with state lastWatermarkTs: %d", w.lastWatermarkTs)
  109. go func() {
  110. err := infra.SafeRun(func() error {
  111. for {
  112. select {
  113. case <-ctx.Done():
  114. ctx.GetLogger().Infof("watermark node %s is finished", w.name)
  115. return nil
  116. case item, opened := <-w.input:
  117. if !opened {
  118. w.statManager.IncTotalExceptions("input channel closed")
  119. break
  120. }
  121. processed := false
  122. if item, processed = w.preprocess(item); processed {
  123. break
  124. }
  125. switch d := item.(type) {
  126. case error:
  127. _ = w.Broadcast(d)
  128. w.statManager.IncTotalExceptions(d.Error())
  129. case *xsql.Tuple:
  130. w.statManager.IncTotalRecordsIn()
  131. // Start the first event processing.
  132. // Later a series of events may send out in order
  133. w.statManager.ProcessTimeStart()
  134. // whether to drop the late event
  135. if w.track(ctx, d.Emitter, d.GetTimestamp()) {
  136. // If not drop, check if it can be sent out
  137. w.addAndTrigger(ctx, d)
  138. }
  139. default:
  140. e := fmt.Errorf("run watermark op error: expect *xsql.Tuple type but got %[1]T(%[1]v)", d)
  141. _ = w.Broadcast(e)
  142. w.statManager.IncTotalExceptions(e.Error())
  143. }
  144. }
  145. }
  146. })
  147. if err != nil {
  148. infra.DrainError(ctx, err, errCh)
  149. }
  150. }()
  151. }
  152. func (w *WatermarkOp) track(ctx api.StreamContext, emitter string, ts int64) bool {
  153. ctx.GetLogger().Debugf("watermark generator track event from topic %s at %d", emitter, ts)
  154. watermark, ok := w.streamWMs[emitter]
  155. if !ok || ts > watermark {
  156. w.streamWMs[emitter] = ts
  157. _ = ctx.PutState(StreamWMKey, w.streamWMs)
  158. }
  159. r := ts >= w.lastWatermarkTs
  160. return r
  161. }
  162. // Add an event and check if watermark proceeds
  163. // If yes, send out all events before the watermark
  164. func (w *WatermarkOp) addAndTrigger(ctx api.StreamContext, d *xsql.Tuple) {
  165. // Insert into the sorted array, should be faster than append then sort
  166. if len(w.events) == 0 {
  167. w.events = append(w.events, d)
  168. } else {
  169. index := sort.Search(len(w.events), func(i int) bool {
  170. return w.events[i].GetTimestamp() > d.GetTimestamp()
  171. })
  172. w.events = append(w.events, nil)
  173. copy(w.events[index+1:], w.events[index:])
  174. w.events[index] = d
  175. }
  176. watermark := w.computeWatermarkTs()
  177. ctx.GetLogger().Debugf("compute watermark event at %d with last %d", watermark, w.lastWatermarkTs)
  178. // Make sure watermark time proceeds
  179. if watermark > w.lastWatermarkTs {
  180. // Send out all events before the watermark
  181. if watermark >= w.events[0].GetTimestamp() {
  182. // Find out the last event to send in this watermark change
  183. c := len(w.events)
  184. for i, e := range w.events {
  185. if e.GetTimestamp() > watermark {
  186. c = i
  187. break
  188. }
  189. }
  190. // Send out all events before the watermark
  191. for i := 0; i < c; i++ {
  192. if i > 0 { // The first event processing time start at the beginning of event receiving
  193. w.statManager.ProcessTimeStart()
  194. }
  195. _ = w.Broadcast(w.events[i])
  196. ctx.GetLogger().Debug("send out event", w.events[i].GetTimestamp())
  197. w.statManager.IncTotalRecordsOut()
  198. w.statManager.ProcessTimeEnd()
  199. }
  200. w.events = w.events[c:]
  201. _ = ctx.PutState(EventInputKey, w.events)
  202. }
  203. // Update watermark
  204. if w.sendWatermark {
  205. _ = w.Broadcast(&xsql.WatermarkTuple{Timestamp: watermark})
  206. }
  207. w.lastWatermarkTs = watermark
  208. _ = ctx.PutState(WatermarkKey, w.lastWatermarkTs)
  209. ctx.GetLogger().Debugf("scan watermark event at %d", watermark)
  210. }
  211. }
  212. // watermark is the minimum timestamp of all input topics
  213. func (w *WatermarkOp) computeWatermarkTs() int64 {
  214. var ts int64 = math.MaxInt64
  215. for _, wm := range w.streamWMs {
  216. if ts > wm {
  217. ts = wm
  218. }
  219. }
  220. return ts - w.lateTolerance
  221. }