watermark_op.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "math"
  18. "sort"
  19. "github.com/lf-edge/ekuiper/internal/topo/node/metric"
  20. "github.com/lf-edge/ekuiper/internal/xsql"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/infra"
  23. )
  24. // WatermarkOp is used when event time is enabled.
  25. // It is used to align the event time of the input streams
  26. // It sends out the data in time order with watermark.
  27. type WatermarkOp struct {
  28. *defaultSinkNode
  29. statManager metric.StatManager
  30. // config
  31. lateTolerance int64
  32. sendWatermark bool
  33. // state
  34. events []*xsql.Tuple // All the cached events in order
  35. streamWMs map[string]int64
  36. lastWatermarkTs int64
  37. }
  38. var _ OperatorNode = &WatermarkOp{}
  39. const (
  40. WatermarkKey = "$$wartermark"
  41. EventInputKey = "$$eventinputs"
  42. StreamWMKey = "$$streamwms"
  43. )
  44. func NewWatermarkOp(name string, sendWatermark bool, streams []string, options *api.RuleOption) *WatermarkOp {
  45. wms := make(map[string]int64, len(streams))
  46. for _, s := range streams {
  47. wms[s] = options.LateTol
  48. }
  49. return &WatermarkOp{
  50. defaultSinkNode: &defaultSinkNode{
  51. input: make(chan interface{}, options.BufferLength),
  52. defaultNode: &defaultNode{
  53. outputs: make(map[string]chan<- interface{}),
  54. name: name,
  55. sendError: options.SendError,
  56. },
  57. },
  58. lateTolerance: options.LateTol,
  59. sendWatermark: sendWatermark,
  60. streamWMs: wms,
  61. }
  62. }
  63. func (w *WatermarkOp) Exec(ctx api.StreamContext, errCh chan<- error) {
  64. ctx.GetLogger().Debugf("watermark node %s is started", w.name)
  65. if len(w.outputs) <= 0 {
  66. infra.DrainError(ctx, fmt.Errorf("no output channel found"), errCh)
  67. return
  68. }
  69. stats, err := metric.NewStatManager(ctx, "op")
  70. if err != nil {
  71. infra.DrainError(ctx, fmt.Errorf("fail to create stat manager"), errCh)
  72. return
  73. }
  74. w.statManager = stats
  75. w.ctx = ctx
  76. // restore state
  77. if s, err := ctx.GetState(WatermarkKey); err == nil && s != nil {
  78. if si, ok := s.(int64); ok {
  79. w.lastWatermarkTs = si
  80. } else {
  81. infra.DrainError(ctx, fmt.Errorf("restore watermark state `lastWatermarkTs` %v error, invalid type", s), errCh)
  82. return
  83. }
  84. }
  85. if s, err := ctx.GetState(EventInputKey); err == nil {
  86. switch st := s.(type) {
  87. case []*xsql.Tuple:
  88. w.events = st
  89. ctx.GetLogger().Infof("Restore watermark events state %+v", st)
  90. case nil:
  91. ctx.GetLogger().Debugf("Restore watermark events state, nothing")
  92. default:
  93. infra.DrainError(ctx, fmt.Errorf("restore watermark event state %v error, invalid type", st), errCh)
  94. return
  95. }
  96. } else {
  97. ctx.GetLogger().Warnf("Restore watermark event state fails: %s", err)
  98. }
  99. if s, err := ctx.GetState(StreamWMKey); err == nil && s != nil {
  100. if si, ok := s.(map[string]int64); ok {
  101. w.streamWMs = si
  102. } else {
  103. infra.DrainError(ctx, fmt.Errorf("restore watermark stream keys state %v error, invalid type", s), errCh)
  104. return
  105. }
  106. }
  107. ctx.GetLogger().Infof("Start with state lastWatermarkTs: %d", w.lastWatermarkTs)
  108. go func() {
  109. err := infra.SafeRun(func() error {
  110. for {
  111. select {
  112. case <-ctx.Done():
  113. ctx.GetLogger().Infof("watermark node %s is finished", w.name)
  114. return nil
  115. case item, opened := <-w.input:
  116. if !opened {
  117. w.statManager.IncTotalExceptions("input channel closed")
  118. break
  119. }
  120. processed := false
  121. if item, processed = w.preprocess(item); processed {
  122. break
  123. }
  124. switch d := item.(type) {
  125. case error:
  126. _ = w.Broadcast(d)
  127. w.statManager.IncTotalExceptions(d.Error())
  128. case *xsql.Tuple:
  129. w.statManager.IncTotalRecordsIn()
  130. // Start the first event processing.
  131. // Later a series of events may send out in order
  132. w.statManager.ProcessTimeStart()
  133. // whether to drop the late event
  134. if w.track(ctx, d.Emitter, d.GetTimestamp()) {
  135. // If not drop, check if it can be sent out
  136. w.addAndTrigger(ctx, d)
  137. }
  138. default:
  139. e := fmt.Errorf("run watermark op error: expect *xsql.Tuple type but got %[1]T(%[1]v)", d)
  140. _ = w.Broadcast(e)
  141. w.statManager.IncTotalExceptions(e.Error())
  142. }
  143. }
  144. }
  145. })
  146. if err != nil {
  147. infra.DrainError(ctx, err, errCh)
  148. }
  149. }()
  150. }
  151. func (w *WatermarkOp) track(ctx api.StreamContext, emitter string, ts int64) bool {
  152. ctx.GetLogger().Debugf("watermark generator track event from topic %s at %d", emitter, ts)
  153. watermark, ok := w.streamWMs[emitter]
  154. if !ok || ts > watermark {
  155. w.streamWMs[emitter] = ts
  156. _ = ctx.PutState(StreamWMKey, w.streamWMs)
  157. }
  158. r := ts >= w.lastWatermarkTs
  159. return r
  160. }
  161. // Add an event and check if watermark proceeds
  162. // If yes, send out all events before the watermark
  163. func (w *WatermarkOp) addAndTrigger(ctx api.StreamContext, d *xsql.Tuple) {
  164. // Insert into the sorted array, should be faster than append then sort
  165. if len(w.events) == 0 {
  166. w.events = append(w.events, d)
  167. } else {
  168. index := sort.Search(len(w.events), func(i int) bool {
  169. return w.events[i].GetTimestamp() > d.GetTimestamp()
  170. })
  171. w.events = append(w.events, nil)
  172. copy(w.events[index+1:], w.events[index:])
  173. w.events[index] = d
  174. }
  175. watermark := w.computeWatermarkTs()
  176. ctx.GetLogger().Debugf("compute watermark event at %d with last %d", watermark, w.lastWatermarkTs)
  177. // Make sure watermark time proceeds
  178. if watermark > w.lastWatermarkTs {
  179. // Send out all events before the watermark
  180. if watermark >= w.events[0].GetTimestamp() {
  181. // Find out the last event to send in this watermark change
  182. c := len(w.events)
  183. for i, e := range w.events {
  184. if e.GetTimestamp() > watermark {
  185. c = i
  186. break
  187. }
  188. }
  189. // Send out all events before the watermark
  190. for i := 0; i < c; i++ {
  191. if i > 0 { // The first event processing time start at the beginning of event receiving
  192. w.statManager.ProcessTimeStart()
  193. }
  194. _ = w.Broadcast(w.events[i])
  195. ctx.GetLogger().Debug("send out event", w.events[i].GetTimestamp())
  196. w.statManager.IncTotalRecordsOut()
  197. w.statManager.ProcessTimeEnd()
  198. }
  199. w.events = w.events[c:]
  200. _ = ctx.PutState(EventInputKey, w.events)
  201. }
  202. // Update watermark
  203. if w.sendWatermark {
  204. _ = w.Broadcast(&xsql.WatermarkTuple{Timestamp: watermark})
  205. }
  206. w.lastWatermarkTs = watermark
  207. _ = ctx.PutState(WatermarkKey, w.lastWatermarkTs)
  208. ctx.GetLogger().Debugf("scan watermark event at %d", watermark)
  209. }
  210. }
  211. // watermark is the minimum timestamp of all input topics
  212. func (w *WatermarkOp) computeWatermarkTs() int64 {
  213. var ts int64 = math.MaxInt64
  214. for _, wm := range w.streamWMs {
  215. if ts > wm {
  216. ts = wm
  217. }
  218. }
  219. return ts - w.lateTolerance
  220. }
  221. func (w *WatermarkOp) GetMetrics() [][]interface{} {
  222. if w.statManager != nil {
  223. return [][]interface{}{
  224. w.statManager.GetMetrics(),
  225. }
  226. } else {
  227. return nil
  228. }
  229. }