watermark_op.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "math"
  18. "sort"
  19. "github.com/lf-edge/ekuiper/internal/topo/node/metric"
  20. "github.com/lf-edge/ekuiper/internal/xsql"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/infra"
  23. )
  24. // WatermarkOp is used when event time is enabled.
  25. // It is used to align the event time of the input streams
  26. // It sends out the data in time order with watermark.
  27. type WatermarkOp struct {
  28. *defaultSinkNode
  29. statManager metric.StatManager
  30. // config
  31. lateTolerance int64
  32. // state
  33. events []*xsql.Tuple // All the cached events in order
  34. streamWMs map[string]int64
  35. lastWatermarkTs int64
  36. }
  37. var _ OperatorNode = &WatermarkOp{}
  38. const (
  39. WatermarkKey = "$$wartermark"
  40. EventInputKey = "$$eventinputs"
  41. StreamWMKey = "$$streamwms"
  42. )
  43. func NewWatermarkOp(name string, streams []string, options *api.RuleOption) *WatermarkOp {
  44. wms := make(map[string]int64, len(streams))
  45. return &WatermarkOp{
  46. defaultSinkNode: &defaultSinkNode{
  47. input: make(chan interface{}, options.BufferLength),
  48. defaultNode: &defaultNode{
  49. outputs: make(map[string]chan<- interface{}),
  50. name: name,
  51. sendError: options.SendError,
  52. },
  53. },
  54. lateTolerance: options.LateTol,
  55. streamWMs: wms,
  56. }
  57. }
  58. func (w *WatermarkOp) Exec(ctx api.StreamContext, errCh chan<- error) {
  59. ctx.GetLogger().Debugf("watermark node %s is started", w.name)
  60. if len(w.outputs) <= 0 {
  61. infra.DrainError(ctx, fmt.Errorf("no output channel found"), errCh)
  62. return
  63. }
  64. stats, err := metric.NewStatManager(ctx, "op")
  65. if err != nil {
  66. infra.DrainError(ctx, fmt.Errorf("fail to create stat manager"), errCh)
  67. return
  68. }
  69. w.statManager = stats
  70. w.ctx = ctx
  71. // restore state
  72. if s, err := ctx.GetState(WatermarkKey); err == nil && s != nil {
  73. if si, ok := s.(int64); ok {
  74. w.lastWatermarkTs = si
  75. } else {
  76. infra.DrainError(ctx, fmt.Errorf("restore watermark state `lastWatermarkTs` %v error, invalid type", s), errCh)
  77. return
  78. }
  79. }
  80. if s, err := ctx.GetState(EventInputKey); err == nil {
  81. switch st := s.(type) {
  82. case []*xsql.Tuple:
  83. w.events = st
  84. ctx.GetLogger().Infof("Restore watermark events state %+v", st)
  85. case nil:
  86. ctx.GetLogger().Debugf("Restore watermark events state, nothing")
  87. default:
  88. infra.DrainError(ctx, fmt.Errorf("restore watermark event state %v error, invalid type", st), errCh)
  89. return
  90. }
  91. } else {
  92. ctx.GetLogger().Warnf("Restore watermark event state fails: %s", err)
  93. }
  94. if s, err := ctx.GetState(StreamWMKey); err == nil && s != nil {
  95. if si, ok := s.(map[string]int64); ok {
  96. w.streamWMs = si
  97. } else {
  98. infra.DrainError(ctx, fmt.Errorf("restore watermark stream keys state %v error, invalid type", s), errCh)
  99. return
  100. }
  101. }
  102. ctx.GetLogger().Infof("Start with state lastWatermarkTs: %d", w.lastWatermarkTs)
  103. go func() {
  104. err := infra.SafeRun(func() error {
  105. for {
  106. select {
  107. case <-ctx.Done():
  108. ctx.GetLogger().Infof("watermark node %s is finished", w.name)
  109. return nil
  110. case item, opened := <-w.input:
  111. if !opened {
  112. w.statManager.IncTotalExceptions("input channel closed")
  113. break
  114. }
  115. processed := false
  116. if item, processed = w.preprocess(item); processed {
  117. break
  118. }
  119. switch d := item.(type) {
  120. case error:
  121. _ = w.Broadcast(d)
  122. w.statManager.IncTotalExceptions(d.Error())
  123. case *xsql.Tuple:
  124. w.statManager.IncTotalRecordsIn()
  125. // Start the first event processing.
  126. // Later a series of events may send out in order
  127. w.statManager.ProcessTimeStart()
  128. // whether to drop the late event
  129. if w.track(ctx, d.Emitter, d.GetTimestamp()) {
  130. // If not drop, check if it can be sent out
  131. w.addAndTrigger(ctx, d)
  132. }
  133. default:
  134. e := fmt.Errorf("run watermark op error: expect *xsql.Tuple type but got %[1]T(%[1]v)", d)
  135. _ = w.Broadcast(e)
  136. w.statManager.IncTotalExceptions(e.Error())
  137. }
  138. }
  139. }
  140. })
  141. if err != nil {
  142. infra.DrainError(ctx, err, errCh)
  143. }
  144. }()
  145. }
  146. func (w *WatermarkOp) track(ctx api.StreamContext, emitter string, ts int64) bool {
  147. ctx.GetLogger().Debugf("watermark generator track event from topic %s at %d", emitter, ts)
  148. watermark, ok := w.streamWMs[emitter]
  149. if !ok || ts > watermark {
  150. w.streamWMs[emitter] = ts
  151. _ = ctx.PutState(StreamWMKey, w.streamWMs)
  152. }
  153. r := ts >= w.lastWatermarkTs
  154. return r
  155. }
  156. // Add an event and check if watermark proceeds
  157. // If yes, send out all events before the watermark
  158. func (w *WatermarkOp) addAndTrigger(ctx api.StreamContext, d *xsql.Tuple) {
  159. w.events = append(w.events, d)
  160. watermark := w.computeWatermarkTs()
  161. ctx.GetLogger().Debugf("compute watermark event at %d with last %d", watermark, w.lastWatermarkTs)
  162. if watermark > w.lastWatermarkTs {
  163. sort.SliceStable(w.events, func(i, j int) bool {
  164. return w.events[i].GetTimestamp() < w.events[j].GetTimestamp()
  165. })
  166. // Find out the last event to send in this watermark change
  167. c := len(w.events)
  168. for i, e := range w.events {
  169. if e.GetTimestamp() > watermark {
  170. c = i
  171. break
  172. }
  173. }
  174. // Send out all events before the watermark
  175. for i := 0; i < c; i++ {
  176. if i > 0 { // The first event processing time start at the beginning of event receiving
  177. w.statManager.ProcessTimeStart()
  178. }
  179. _ = w.Broadcast(w.events[i])
  180. ctx.GetLogger().Debug("send out event", w.events[i].GetTimestamp())
  181. w.statManager.IncTotalRecordsOut()
  182. w.statManager.ProcessTimeEnd()
  183. }
  184. w.events = w.events[c:]
  185. _ = ctx.PutState(EventInputKey, w.events)
  186. _ = w.Broadcast(&xsql.WatermarkTuple{Timestamp: watermark})
  187. w.lastWatermarkTs = watermark
  188. _ = ctx.PutState(WatermarkKey, w.lastWatermarkTs)
  189. ctx.GetLogger().Debugf("scan watermark event at %d", watermark)
  190. }
  191. }
  192. // watermark is the minimum timestamp of all input topics
  193. func (w *WatermarkOp) computeWatermarkTs() int64 {
  194. var ts int64 = math.MaxInt64
  195. for _, wm := range w.streamWMs {
  196. if ts > wm {
  197. ts = wm
  198. }
  199. }
  200. return ts - w.lateTolerance
  201. }
  202. func (w *WatermarkOp) GetMetrics() [][]interface{} {
  203. if w.statManager != nil {
  204. return [][]interface{}{
  205. w.statManager.GetMetrics(),
  206. }
  207. } else {
  208. return nil
  209. }
  210. }